HDFS-10930. Refactor: Wrap Datanode IO related operations. Contributed by 
Xiaoyu Yao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/df983b52
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/df983b52
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/df983b52

Branch: refs/heads/YARN-5734
Commit: df983b524ab68ea0c70cee9033bfff2d28052cbf
Parents: 43cb167
Author: Xiaoyu Yao <x...@apache.org>
Authored: Mon Dec 5 13:04:39 2016 -0800
Committer: Xiaoyu Yao <x...@apache.org>
Committed: Tue Dec 6 11:05:47 2016 -0800

----------------------------------------------------------------------
 .../hdfs/server/datanode/BlockReceiver.java     |  66 +++----
 .../hdfs/server/datanode/BlockSender.java       | 105 ++++-------
 .../hadoop/hdfs/server/datanode/DNConf.java     |   4 +
 .../hdfs/server/datanode/DataStorage.java       |   5 +
 .../hdfs/server/datanode/LocalReplica.java      | 179 +++++++++++++------
 .../server/datanode/LocalReplicaInPipeline.java |  30 ++--
 .../hdfs/server/datanode/ReplicaInPipeline.java |   4 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java |   3 +-
 .../datanode/fsdataset/ReplicaInputStreams.java | 102 ++++++++++-
 .../fsdataset/ReplicaOutputStreams.java         | 107 ++++++++++-
 .../datanode/fsdataset/impl/BlockPoolSlice.java |  97 +++++-----
 .../impl/FsDatasetAsyncDiskService.java         |   7 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |   5 +-
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |   5 +-
 .../org/apache/hadoop/hdfs/TestFileAppend.java  |   2 +-
 .../server/datanode/SimulatedFSDataset.java     |   8 +-
 .../hdfs/server/datanode/TestBlockRecovery.java |   2 +-
 .../server/datanode/TestSimulatedFSDataset.java |   2 +-
 .../extdataset/ExternalDatasetImpl.java         |   4 +-
 .../extdataset/ExternalReplicaInPipeline.java   |   6 +-
 20 files changed, 470 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 39419c1..f372072 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -24,10 +24,7 @@ import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
-import java.io.FileDescriptor;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.nio.ByteBuffer;
@@ -53,7 +50,6 @@ import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
@@ -88,8 +84,6 @@ class BlockReceiver implements Closeable {
    * the DataNode needs to recalculate checksums before writing.
    */
   private final boolean needsChecksumTranslation;
-  private OutputStream out = null; // to block file at local disk
-  private FileDescriptor outFd;
   private DataOutputStream checksumOut = null; // to crc file at local disk
   private final int bytesPerChecksum;
   private final int checksumSize;
@@ -250,7 +244,8 @@ class BlockReceiver implements Closeable {
       
       final boolean isCreate = isDatanode || isTransfer 
           || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
-      streams = replicaInfo.createStreams(isCreate, requestedChecksum);
+      streams = replicaInfo.createStreams(isCreate, requestedChecksum,
+          datanodeSlowLogThresholdMs);
       assert streams != null : "null streams!";
 
       // read checksum meta information
@@ -260,13 +255,6 @@ class BlockReceiver implements Closeable {
       this.bytesPerChecksum = diskChecksum.getBytesPerChecksum();
       this.checksumSize = diskChecksum.getChecksumSize();
 
-      this.out = streams.getDataOut();
-      if (out instanceof FileOutputStream) {
-        this.outFd = ((FileOutputStream)out).getFD();
-      } else {
-        LOG.warn("Could not get file descriptor for outputstream of class " +
-            out.getClass());
-      }
       this.checksumOut = new DataOutputStream(new BufferedOutputStream(
           streams.getChecksumOut(), DFSUtilClient.getSmallBufferSize(
           datanode.getConf())));
@@ -319,7 +307,7 @@ class BlockReceiver implements Closeable {
     packetReceiver.close();
 
     IOException ioe = null;
-    if (syncOnClose && (out != null || checksumOut != null)) {
+    if (syncOnClose && (streams.getDataOut() != null || checksumOut != null)) {
       datanode.metrics.incrFsyncCount();      
     }
     long flushTotalNanos = 0;
@@ -348,9 +336,9 @@ class BlockReceiver implements Closeable {
     }
     // close block file
     try {
-      if (out != null) {
+      if (streams.getDataOut() != null) {
         long flushStartNanos = System.nanoTime();
-        out.flush();
+        streams.flushDataOut();
         long flushEndNanos = System.nanoTime();
         if (syncOnClose) {
           long fsyncStartNanos = flushEndNanos;
@@ -359,14 +347,13 @@ class BlockReceiver implements Closeable {
         }
         flushTotalNanos += flushEndNanos - flushStartNanos;
         measuredFlushTime = true;
-        out.close();
-        out = null;
+        streams.closeDataStream();
       }
     } catch (IOException e) {
       ioe = e;
     }
     finally{
-      IOUtils.closeStream(out);
+      streams.close();
     }
     if (replicaHandler != null) {
       IOUtils.cleanup(null, replicaHandler);
@@ -419,9 +406,9 @@ class BlockReceiver implements Closeable {
       }
       flushTotalNanos += flushEndNanos - flushStartNanos;
     }
-    if (out != null) {
+    if (streams.getDataOut() != null) {
       long flushStartNanos = System.nanoTime();
-      out.flush();
+      streams.flushDataOut();
       long flushEndNanos = System.nanoTime();
       if (isSync) {
         long fsyncStartNanos = flushEndNanos;
@@ -430,10 +417,10 @@ class BlockReceiver implements Closeable {
       }
       flushTotalNanos += flushEndNanos - flushStartNanos;
     }
-    if (checksumOut != null || out != null) {
+    if (checksumOut != null || streams.getDataOut() != null) {
       datanode.metrics.addFlushNanos(flushTotalNanos);
       if (isSync) {
-         datanode.metrics.incrFsyncCount();      
+        datanode.metrics.incrFsyncCount();
       }
     }
     long duration = Time.monotonicNow() - begin;
@@ -716,16 +703,12 @@ class BlockReceiver implements Closeable {
           int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
           
           // Write data to disk.
-          long begin = Time.monotonicNow();
-          out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
-          long duration = Time.monotonicNow() - begin;
+          long duration = streams.writeToDisk(dataBuf.array(),
+              startByteToDisk, numBytesToDisk);
+
           if (duration > maxWriteToDiskMs) {
             maxWriteToDiskMs = duration;
           }
-          if (duration > datanodeSlowLogThresholdMs) {
-            LOG.warn("Slow BlockReceiver write data to disk cost:" + duration
-                + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
-          }
 
           final byte[] lastCrc;
           if (shouldNotWriteChecksum) {
@@ -842,7 +825,7 @@ class BlockReceiver implements Closeable {
 
   private void manageWriterOsCache(long offsetInBlock) {
     try {
-      if (outFd != null &&
+      if (streams.getOutFd() != null &&
           offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
         long begin = Time.monotonicNow();
         //
@@ -857,12 +840,11 @@ class BlockReceiver implements Closeable {
         if (syncBehindWrites) {
           if (syncBehindWritesInBackground) {
             this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest(
-                block, outFd, lastCacheManagementOffset,
+                block, streams, lastCacheManagementOffset,
                 offsetInBlock - lastCacheManagementOffset,
                 SYNC_FILE_RANGE_WRITE);
           } else {
-            NativeIO.POSIX.syncFileRangeIfPossible(outFd,
-                lastCacheManagementOffset,
+            streams.syncFileRangeIfPossible(lastCacheManagementOffset,
                 offsetInBlock - lastCacheManagementOffset,
                 SYNC_FILE_RANGE_WRITE);
           }
@@ -879,8 +861,8 @@ class BlockReceiver implements Closeable {
         //                     
         long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES;
         if (dropPos > 0 && dropCacheBehindWrites) {
-          NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
-              block.getBlockName(), outFd, 0, dropPos, POSIX_FADV_DONTNEED);
+          streams.dropCacheBehindWrites(block.getBlockName(), 0, dropPos,
+              POSIX_FADV_DONTNEED);
         }
         lastCacheManagementOffset = offsetInBlock;
         long duration = Time.monotonicNow() - begin;
@@ -989,7 +971,7 @@ class BlockReceiver implements Closeable {
               // The worst case is not recovering this RBW replica. 
               // Client will fall back to regular pipeline recovery.
             } finally {
-              IOUtils.closeStream(out);
+              IOUtils.closeStream(streams.getDataOut());
             }
             try {              
               // Even if the connection is closed after the ack packet is
@@ -1047,8 +1029,8 @@ class BlockReceiver implements Closeable {
    * will be overwritten.
    */
   private void adjustCrcFilePosition() throws IOException {
-    if (out != null) {
-     out.flush();
+    if (streams.getDataOut() != null) {
+      streams.flushDataOut();
     }
     if (checksumOut != null) {
       checksumOut.flush();
@@ -1094,10 +1076,10 @@ class BlockReceiver implements Closeable {
     byte[] crcbuf = new byte[checksumSize];
     try (ReplicaInputStreams instr =
         datanode.data.getTmpInputStreams(block, blkoff, ckoff)) {
-      IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk);
+      instr.readDataFully(buf, 0, sizePartialChunk);
 
       // open meta file and read in crc value computer earlier
-      IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length);
+      instr.readChecksumFully(crcbuf, 0, crcbuf.length);
     }
 
     // compute crc of partial chunk from data read in the block file.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index a1b1f86..9182c88 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode;
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.FileDescriptor;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -42,11 +41,11 @@ import 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
-import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.DataChecksum;
@@ -120,12 +119,11 @@ class BlockSender implements java.io.Closeable {
   
   /** the block to read from */
   private final ExtendedBlock block;
-  /** Stream to read block data from */
-  private InputStream blockIn;
+
+  /** InputStreams and file descriptors to read block/checksum. */
+  private ReplicaInputStreams ris;
   /** updated while using transferTo() */
   private long blockInPosition = -1;
-  /** Stream to read checksum */
-  private DataInputStream checksumIn;
   /** Checksum utility */
   private final DataChecksum checksum;
   /** Initial position to read */
@@ -152,11 +150,6 @@ class BlockSender implements java.io.Closeable {
   private final String clientTraceFmt;
   private volatile ChunkChecksum lastChunkChecksum = null;
   private DataNode datanode;
-  
-  /** The file descriptor of the block being sent */
-  private FileDescriptor blockInFd;
-  /** The reference to the volume where the block is located */
-  private FsVolumeReference volumeRef;
 
   /** The replica of the block that is being read. */
   private final Replica replica;
@@ -201,6 +194,9 @@ class BlockSender implements java.io.Closeable {
               boolean sendChecksum, DataNode datanode, String clientTraceFmt,
               CachingStrategy cachingStrategy)
       throws IOException {
+    InputStream blockIn = null;
+    DataInputStream checksumIn = null;
+    FsVolumeReference volumeRef = null;
     try {
       this.block = block;
       this.corruptChecksumOk = corruptChecksumOk;
@@ -281,7 +277,7 @@ class BlockSender implements java.io.Closeable {
         (!is32Bit || length <= Integer.MAX_VALUE);
 
       // Obtain a reference before reading data
-      this.volumeRef = datanode.data.getVolume(block).obtainReference();
+      volumeRef = datanode.data.getVolume(block).obtainReference();
 
       /* 
        * (corruptChecksumOK, meta_file_exist): operation
@@ -405,14 +401,9 @@ class BlockSender implements java.io.Closeable {
         DataNode.LOG.debug("replica=" + replica);
       }
       blockIn = datanode.data.getBlockInputStream(block, offset); // seek to 
offset
-      if (blockIn instanceof FileInputStream) {
-        blockInFd = ((FileInputStream)blockIn).getFD();
-      } else {
-        blockInFd = null;
-      }
+      ris = new ReplicaInputStreams(blockIn, checksumIn, volumeRef);
     } catch (IOException ioe) {
       IOUtils.closeStream(this);
-      IOUtils.closeStream(blockIn);
       throw ioe;
     }
   }
@@ -422,12 +413,11 @@ class BlockSender implements java.io.Closeable {
    */
   @Override
   public void close() throws IOException {
-    if (blockInFd != null &&
+    if (ris.getDataInFd() != null &&
         ((dropCacheBehindAllReads) ||
          (dropCacheBehindLargeReads && isLongRead()))) {
       try {
-        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
-            block.getBlockName(), blockInFd, lastCacheDropOffset,
+        ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset,
             offset - lastCacheDropOffset, POSIX_FADV_DONTNEED);
       } catch (Exception e) {
         LOG.warn("Unable to drop cache on file close", e);
@@ -436,32 +426,12 @@ class BlockSender implements java.io.Closeable {
     if (curReadahead != null) {
       curReadahead.cancel();
     }
-    
-    IOException ioe = null;
-    if(checksumIn!=null) {
-      try {
-        checksumIn.close(); // close checksum file
-      } catch (IOException e) {
-        ioe = e;
-      }
-      checksumIn = null;
-    }   
-    if(blockIn!=null) {
-      try {
-        blockIn.close(); // close data file
-      } catch (IOException e) {
-        ioe = e;
-      }
-      blockIn = null;
-      blockInFd = null;
-    }
-    if (volumeRef != null) {
-      IOUtils.cleanup(null, volumeRef);
-      volumeRef = null;
-    }
-    // throw IOException if there is any
-    if(ioe!= null) {
-      throw ioe;
+
+    try {
+      ris.closeStreams();
+    } finally {
+      IOUtils.closeStream(ris);
+      ris = null;
     }
   }
   
@@ -565,7 +535,7 @@ class BlockSender implements java.io.Closeable {
     int checksumOff = pkt.position();
     byte[] buf = pkt.array();
     
-    if (checksumSize > 0 && checksumIn != null) {
+    if (checksumSize > 0 && ris.getChecksumIn() != null) {
       readChecksum(buf, checksumOff, checksumDataLen);
 
       // write in progress that we need to use to get last checksum
@@ -581,7 +551,7 @@ class BlockSender implements java.io.Closeable {
     
     int dataOff = checksumOff + checksumDataLen;
     if (!transferTo) { // normal transfer
-      IOUtils.readFully(blockIn, buf, dataOff, dataLen);
+      ris.readDataFully(buf, dataOff, dataLen);
 
       if (verifyChecksum) {
         verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
@@ -593,12 +563,12 @@ class BlockSender implements java.io.Closeable {
         SocketOutputStream sockOut = (SocketOutputStream)out;
         // First write header and checksums
         sockOut.write(buf, headerOff, dataOff - headerOff);
-        
+
         // no need to flush since we know out is not a buffered stream
-        FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
+        FileChannel fileCh = ((FileInputStream)ris.getDataIn()).getChannel();
         LongWritable waitTime = new LongWritable();
         LongWritable transferTime = new LongWritable();
-        sockOut.transferToFully(fileCh, blockInPosition, dataLen, 
+        sockOut.transferToFully(fileCh, blockInPosition, dataLen,
             waitTime, transferTime);
         
datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
         datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
@@ -630,7 +600,7 @@ class BlockSender implements java.io.Closeable {
         if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection 
reset")) {
           LOG.error("BlockSender.sendChunks() exception: ", e);
           datanode.getBlockScanner().markSuspectBlock(
-              volumeRef.getVolume().getStorageID(),
+              ris.getVolumeRef().getVolume().getStorageID(),
               block);
         }
       }
@@ -653,16 +623,15 @@ class BlockSender implements java.io.Closeable {
    */
   private void readChecksum(byte[] buf, final int checksumOffset,
       final int checksumLen) throws IOException {
-    if (checksumSize <= 0 && checksumIn == null) {
+    if (checksumSize <= 0 && ris.getChecksumIn() == null) {
       return;
     }
     try {
-      checksumIn.readFully(buf, checksumOffset, checksumLen);
+      ris.readChecksumFully(buf, checksumOffset, checksumLen);
     } catch (IOException e) {
       LOG.warn(" Could not read or failed to verify checksum for data"
           + " at offset " + offset + " for block " + block, e);
-      IOUtils.closeStream(checksumIn);
-      checksumIn = null;
+      ris.closeChecksumStream();
       if (corruptChecksumOk) {
         if (checksumOffset < checksumLen) {
           // Just fill the array with zeros.
@@ -746,10 +715,10 @@ class BlockSender implements java.io.Closeable {
     
     lastCacheDropOffset = initialOffset;
 
-    if (isLongRead() && blockInFd != null) {
+    if (isLongRead() && ris.getDataInFd() != null) {
       // Advise that this file descriptor will be accessed sequentially.
-      NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
-          block.getBlockName(), blockInFd, 0, 0, POSIX_FADV_SEQUENTIAL);
+      ris.dropCacheBehindReads(block.getBlockName(), 0, 0,
+          POSIX_FADV_SEQUENTIAL);
     }
     
     // Trigger readahead of beginning of file if configured.
@@ -761,9 +730,10 @@ class BlockSender implements java.io.Closeable {
       int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
       boolean transferTo = transferToAllowed && !verifyChecksum
           && baseStream instanceof SocketOutputStream
-          && blockIn instanceof FileInputStream;
+          && ris.getDataIn() instanceof FileInputStream;
       if (transferTo) {
-        FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
+        FileChannel fileChannel =
+            ((FileInputStream)ris.getDataIn()).getChannel();
         blockInPosition = fileChannel.position();
         streamForSendChunks = baseStream;
         maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
@@ -818,14 +788,16 @@ class BlockSender implements java.io.Closeable {
   private void manageOsCache() throws IOException {
     // We can't manage the cache for this block if we don't have a file
     // descriptor to work with.
-    if (blockInFd == null) return;
+    if (ris.getDataInFd() == null) {
+      return;
+    }
 
     // Perform readahead if necessary
     if ((readaheadLength > 0) && (datanode.readaheadPool != null) &&
           (alwaysReadahead || isLongRead())) {
       curReadahead = datanode.readaheadPool.readaheadStream(
-          clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE,
-          curReadahead);
+          clientTraceFmt, ris.getDataInFd(), offset, readaheadLength,
+          Long.MAX_VALUE, curReadahead);
     }
 
     // Drop what we've just read from cache, since we aren't
@@ -835,8 +807,7 @@ class BlockSender implements java.io.Closeable {
       long nextCacheDropOffset = lastCacheDropOffset + 
CACHE_DROP_INTERVAL_BYTES;
       if (offset >= nextCacheDropOffset) {
         long dropLength = offset - lastCacheDropOffset;
-        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
-            block.getBlockName(), blockInFd, lastCacheDropOffset,
+        ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset,
             dropLength, POSIX_FADV_DONTNEED);
         lastCacheDropOffset = offset;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 823d05c..c1487b1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -402,6 +402,10 @@ public class DNConf {
     return volsConfigured;
   }
 
+  public long getSlowIoWarningThresholdMs() {
+    return datanodeSlowIoWarningThresholdMs;
+  }
+
   int getMaxDataLength() {
     return maxDataLength;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 29b14e7..f4deb6d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -1355,4 +1355,9 @@ public class DataStorage extends Storage {
   synchronized void removeBlockPoolStorage(String bpId) {
     bpStorageMap.remove(bpId);
   }
+
+  public static boolean fullyDelete(final File dir) {
+    boolean result = FileUtil.fullyDelete(dir);
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
index f829111..e6f7e12 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
@@ -29,9 +29,6 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.HardLink;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -46,6 +43,8 @@ import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -69,15 +68,7 @@ abstract public class LocalReplica extends ReplicaInfo {
 
   private static final Map<String, File> internedBaseDirs = new 
HashMap<String, File>();
 
-  static final Log LOG = LogFactory.getLog(LocalReplica.class);
-  private final static boolean IS_NATIVE_IO_AVAIL;
-  static {
-    IS_NATIVE_IO_AVAIL = NativeIO.isAvailable();
-    if (Path.WINDOWS && !IS_NATIVE_IO_AVAIL) {
-      LOG.warn("Data node cannot fully support concurrent reading"
-          + " and writing without native code extensions on Windows.");
-    }
-  }
+  static final Logger LOG = LoggerFactory.getLogger(LocalReplica.class);
 
   /**
    * Constructor
@@ -199,14 +190,14 @@ abstract public class LocalReplica extends ReplicaInfo {
     File tmpFile = DatanodeUtil.createTmpFile(b, 
DatanodeUtil.getUnlinkTmpFile(file));
     try (FileInputStream in = new FileInputStream(file)) {
       try (FileOutputStream out = new FileOutputStream(tmpFile)){
-        IOUtils.copyBytes(in, out, 16 * 1024);
+        copyBytes(in, out, 16 * 1024);
       }
       if (file.length() != tmpFile.length()) {
         throw new IOException("Copy of file " + file + " size " + 
file.length()+
                               " into file " + tmpFile +
                               " resulted in a size of " + tmpFile.length());
       }
-      FileUtil.replaceFile(tmpFile, file);
+      replaceFile(tmpFile, file);
     } catch (IOException e) {
       boolean done = tmpFile.delete();
       if (!done) {
@@ -241,13 +232,13 @@ abstract public class LocalReplica extends ReplicaInfo {
     }
     File meta = getMetaFile();
 
-    int linkCount = HardLink.getLinkCount(file);
+    int linkCount = getHardLinkCount(file);
     if (linkCount > 1) {
       DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
           "block " + this);
       breakHardlinks(file, this);
     }
-    if (HardLink.getLinkCount(meta) > 1) {
+    if (getHardLinkCount(meta) > 1) {
       breakHardlinks(meta, this);
     }
     return true;
@@ -260,18 +251,7 @@ abstract public class LocalReplica extends ReplicaInfo {
 
   @Override
   public InputStream getDataInputStream(long seekOffset) throws IOException {
-
-    File blockFile = getBlockFile();
-    if (IS_NATIVE_IO_AVAIL) {
-      return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
-    } else {
-      try {
-        return FsDatasetUtil.openAndSeek(blockFile, seekOffset);
-      } catch (FileNotFoundException fnfe) {
-        throw new IOException("Block " + this + " is not valid. " +
-            "Expected block file at " + blockFile + " does not exist.");
-      }
-    }
+    return getDataInputStream(getBlockFile(), seekOffset);
   }
 
   @Override
@@ -286,7 +266,7 @@ abstract public class LocalReplica extends ReplicaInfo {
 
   @Override
   public boolean deleteBlockData() {
-    return getBlockFile().delete();
+    return fullyDelete(getBlockFile());
   }
 
   @Override
@@ -320,7 +300,7 @@ abstract public class LocalReplica extends ReplicaInfo {
 
   @Override
   public boolean deleteMetadata() {
-    return getMetaFile().delete();
+    return fullyDelete(getMetaFile());
   }
 
   @Override
@@ -340,7 +320,7 @@ abstract public class LocalReplica extends ReplicaInfo {
 
   private boolean renameFile(File srcfile, File destfile) throws IOException {
     try {
-      NativeIO.renameTo(srcfile, destfile);
+      rename(srcfile, destfile);
       return true;
     } catch (IOException e) {
       throw new IOException("Failed to move block file for " + this
@@ -367,22 +347,14 @@ abstract public class LocalReplica extends ReplicaInfo {
 
   @Override
   public boolean getPinning(LocalFileSystem localFS) throws IOException {
-    FileStatus fss =
-        localFS.getFileStatus(new Path(getBlockFile().getAbsolutePath()));
-    return fss.getPermission().getStickyBit();
+    return getPinning(localFS, new Path(getBlockFile().getAbsolutePath()));
   }
 
   @Override
   public void setPinning(LocalFileSystem localFS) throws IOException {
     File f = getBlockFile();
     Path p = new Path(f.getAbsolutePath());
-
-    FsPermission oldPermission = localFS.getFileStatus(
-        new Path(f.getAbsolutePath())).getPermission();
-    //sticky bit is used for pinning purpose
-    FsPermission permission = new FsPermission(oldPermission.getUserAction(),
-        oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
-    localFS.setPermission(p, permission);
+    setPinning(localFS, p);
   }
 
   @Override
@@ -398,7 +370,7 @@ abstract public class LocalReplica extends ReplicaInfo {
     }
     try {
       // calling renameMeta on the ReplicaInfo doesn't work here
-      NativeIO.renameTo(oldmeta, newmeta);
+      rename(oldmeta, newmeta);
     } catch (IOException e) {
       setGenerationStamp(oldGS); // restore old GS
       throw new IOException("Block " + this + " reopen failed. " +
@@ -417,7 +389,113 @@ abstract public class LocalReplica extends ReplicaInfo {
     return info.getBlockFile().compareTo(getBlockFile());
   }
 
-  static public void truncateBlock(File blockFile, File metaFile,
+  @Override
+  public void copyMetadata(URI destination) throws IOException {
+    //for local replicas, we assume the destination URI is file
+    nativeCopyFileUnbuffered(getMetaFile(), new File(destination), true);
+  }
+
+  @Override
+  public void copyBlockdata(URI destination) throws IOException {
+    //for local replicas, we assume the destination URI is file
+    nativeCopyFileUnbuffered(getBlockFile(), new File(destination), true);
+  }
+
+  public void renameMeta(File newMetaFile) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming " + getMetaFile() + " to " + newMetaFile);
+    }
+    renameFile(getMetaFile(), newMetaFile);
+  }
+
+  public void renameBlock(File newBlockFile) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming " + getBlockFile() + " to " + newBlockFile
+          + ", file length=" + getBlockFile().length());
+    }
+    renameFile(getBlockFile(), newBlockFile);
+  }
+
+  public static void rename(File from, File to) throws IOException {
+    Storage.rename(from, to);
+  }
+
+  /**
+   * Get input stream for a local file and optionally seek to the offset.
+   * @param f path to the file
+   * @param seekOffset offset to seek
+   * @return input stream for read
+   * @throws IOException
+   */
+  private FileInputStream getDataInputStream(File f, long seekOffset)
+      throws IOException {
+    FileInputStream fis;
+    if (NativeIO.isAvailable()) {
+      fis = NativeIO.getShareDeleteFileInputStream(f, seekOffset);
+    } else {
+      try {
+        fis = FsDatasetUtil.openAndSeek(f, seekOffset);
+      } catch (FileNotFoundException fnfe) {
+        throw new IOException("Expected block file at " + f +
+            " does not exist.");
+      }
+    }
+    return fis;
+  }
+
+  private void nativeCopyFileUnbuffered(File srcFile, File destFile,
+      boolean preserveFileDate) throws IOException {
+    Storage.nativeCopyFileUnbuffered(srcFile, destFile, preserveFileDate);
+  }
+
+  private void copyBytes(InputStream in, OutputStream out, int
+      buffSize) throws IOException{
+    IOUtils.copyBytes(in, out, buffSize);
+  }
+
+  private void replaceFile(File src, File target) throws IOException {
+    FileUtil.replaceFile(src, target);
+  }
+
+  public static boolean fullyDelete(final File dir) {
+    boolean result = DataStorage.fullyDelete(dir);
+    return result;
+  }
+
+  public static int getHardLinkCount(File fileName) throws IOException {
+    int linkCount = HardLink.getLinkCount(fileName);
+    return linkCount;
+  }
+
+  /**
+   *  Get pin status of a file by checking the sticky bit.
+   * @param localFS local file system
+   * @param path path to be checked
+   * @return true if the file is pinned with sticky bit
+   * @throws IOException
+   */
+  public boolean getPinning(LocalFileSystem localFS, Path path) throws
+      IOException {
+    boolean stickyBit =
+        localFS.getFileStatus(path).getPermission().getStickyBit();
+    return stickyBit;
+  }
+
+  /**
+   * Set sticky bit on path to pin file.
+   * @param localFS local file system
+   * @param path path to be pinned with sticky bit
+   * @throws IOException
+   */
+  public void setPinning(LocalFileSystem localFS, Path path) throws
+      IOException {
+    FsPermission oldPermission = localFS.getFileStatus(path).getPermission();
+    FsPermission permission = new FsPermission(oldPermission.getUserAction(),
+        oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
+    localFS.setPermission(path, permission);
+  }
+
+  public static void truncateBlock(File blockFile, File metaFile,
       long oldlen, long newlen) throws IOException {
     LOG.info("truncateBlock: blockFile=" + blockFile
         + ", metaFile=" + metaFile
@@ -467,19 +545,4 @@ abstract public class LocalReplica extends ReplicaInfo {
       metaRAF.close();
     }
   }
-
-  @Override
-  public void copyMetadata(URI destination) throws IOException {
-    //for local replicas, we assume the destination URI is file
-    Storage.nativeCopyFileUnbuffered(getMetaFile(),
-        new File(destination), true);
-  }
-
-  @Override
-  public void copyBlockdata(URI destination) throws IOException {
-    //for local replicas, we assume the destination URI is file
-    Storage.nativeCopyFileUnbuffered(getBlockFile(),
-        new File(destination), true);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
index bc7bc6d..1387155 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
@@ -30,7 +30,6 @@ import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 
@@ -246,7 +245,8 @@ public class LocalReplicaInPipeline extends LocalReplica
 
   @Override // ReplicaInPipeline
   public ReplicaOutputStreams createStreams(boolean isCreate,
-      DataChecksum requestedChecksum) throws IOException {
+      DataChecksum requestedChecksum, long slowLogThresholdMs)
+      throws IOException {
     File blockFile = getBlockFile();
     File metaFile = getMetaFile();
     if (DataNode.LOG.isDebugEnabled()) {
@@ -313,7 +313,7 @@ public class LocalReplicaInPipeline extends LocalReplica
         crcOut.getChannel().position(crcDiskSize);
       }
       return new ReplicaOutputStreams(blockOut, crcOut, checksum,
-          getVolume().isTransientStorage());
+          getVolume().isTransientStorage(), slowLogThresholdMs);
     } catch (IOException e) {
       IOUtils.closeStream(blockOut);
       IOUtils.closeStream(metaRAF);
@@ -373,40 +373,30 @@ public class LocalReplicaInPipeline extends LocalReplica
           + " should be derived from LocalReplica");
     }
 
-    LocalReplica localReplica = (LocalReplica) oldReplicaInfo;
-
-    File oldmeta = localReplica.getMetaFile();
+    LocalReplica oldReplica = (LocalReplica) oldReplicaInfo;
+    File oldmeta = oldReplica.getMetaFile();
     File newmeta = getMetaFile();
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Renaming " + oldmeta + " to " + newmeta);
-    }
     try {
-      NativeIO.renameTo(oldmeta, newmeta);
+      oldReplica.renameMeta(newmeta);
     } catch (IOException e) {
       throw new IOException("Block " + oldReplicaInfo + " reopen failed. " +
                             " Unable to move meta file  " + oldmeta +
                             " to rbw dir " + newmeta, e);
     }
 
-    File blkfile = localReplica.getBlockFile();
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Renaming " + blkfile + " to " + newBlkFile
-          + ", file length=" + blkfile.length());
-    }
     try {
-      NativeIO.renameTo(blkfile, newBlkFile);
+      oldReplica.renameBlock(newBlkFile);
     } catch (IOException e) {
       try {
-        NativeIO.renameTo(newmeta, oldmeta);
+        renameMeta(oldmeta);
       } catch (IOException ex) {
         LOG.warn("Cannot move meta file " + newmeta +
             "back to the finalized directory " + oldmeta, ex);
       }
       throw new IOException("Block " + oldReplicaInfo + " reopen failed. " +
-                              " Unable to move block file " + blkfile +
-                              " to rbw dir " + newBlkFile, e);
+          " Unable to move block file " + oldReplica.getBlockFile() +
+          " to rbw dir " + newBlkFile, e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index efa6ea6..5fdbec0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -69,11 +69,13 @@ public interface ReplicaInPipeline extends Replica {
    *
    * @param isCreate if it is for creation
    * @param requestedChecksum the checksum the writer would prefer to use
+   * @param slowLogThresholdMs slow io threshold for logging
    * @return output streams for writing
    * @throws IOException if any error occurs
    */
   public ReplicaOutputStreams createStreams(boolean isCreate,
-      DataChecksum requestedChecksum) throws IOException;
+      DataChecksum requestedChecksum, long slowLogThresholdMs)
+      throws IOException;
 
   /**
    * Create an output stream to write restart metadata in case of datanode

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 57ec2b4..30f045f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
 import java.io.Closeable;
 import java.io.EOFException;
 import java.io.File;
-import java.io.FileDescriptor;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -605,7 +604,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
    * submit a sync_file_range request to AsyncDiskService.
    */
   void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block,
-      final FileDescriptor fd, final long offset, final long nbytes,
+      final ReplicaOutputStreams outs, final long offset, final long nbytes,
       final int flags);
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
index 227179d..54d0e96 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
@@ -18,24 +18,45 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset;
 
 import java.io.Closeable;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
 import java.io.InputStream;
+import java.io.IOException;
 
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIOException;
+import org.slf4j.Logger;
 
 /**
  * Contains the input streams for the data and checksum of a replica.
  */
 public class ReplicaInputStreams implements Closeable {
-  private final InputStream dataIn;
-  private final InputStream checksumIn;
-  private final FsVolumeReference volumeRef;
+  public static final Logger LOG = DataNode.LOG;
+
+  private InputStream dataIn;
+  private InputStream checksumIn;
+  private FsVolumeReference volumeRef;
+  private FileDescriptor dataInFd = null;
 
   /** Create an object with a data input stream and a checksum input stream. */
-  public ReplicaInputStreams(InputStream dataStream, InputStream 
checksumStream,
-      FsVolumeReference volumeRef) {
+  public ReplicaInputStreams(InputStream dataStream,
+      InputStream checksumStream, FsVolumeReference volumeRef) {
     this.volumeRef = volumeRef;
     this.dataIn = dataStream;
     this.checksumIn = checksumStream;
+    if (dataIn instanceof FileInputStream) {
+      try {
+        dataInFd = ((FileInputStream) dataIn).getFD();
+      } catch (Exception e) {
+        LOG.warn("Could not get file descriptor for inputstream of class " +
+            this.dataIn.getClass());
+      }
+    } else {
+      LOG.debug("Could not get file descriptor for inputstream of class " +
+          this.dataIn.getClass());
+    }
   }
 
   /** @return the data input stream. */
@@ -48,10 +69,81 @@ public class ReplicaInputStreams implements Closeable {
     return checksumIn;
   }
 
+  public FileDescriptor getDataInFd() {
+    return dataInFd;
+  }
+
+  public FsVolumeReference getVolumeRef() {
+    return volumeRef;
+  }
+
+  public void readDataFully(byte[] buf, int off, int len)
+      throws IOException {
+    IOUtils.readFully(dataIn, buf, off, len);
+  }
+
+  public void readChecksumFully(byte[] buf, int off, int len)
+      throws IOException {
+    IOUtils.readFully(checksumIn, buf, off, len);
+  }
+
+  public void skipDataFully(long len) throws IOException {
+    IOUtils.skipFully(dataIn, len);
+  }
+
+  public void skipChecksumFully(long len) throws IOException {
+    IOUtils.skipFully(checksumIn, len);
+  }
+
+  public void closeChecksumStream() throws IOException {
+    IOUtils.closeStream(checksumIn);
+    checksumIn = null;
+  }
+
+  public void dropCacheBehindReads(String identifier, long offset, long len,
+      int flags) throws NativeIOException {
+    assert this.dataInFd != null : "null dataInFd!";
+    NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+        identifier, dataInFd, offset, len, flags);
+  }
+
+  public void closeStreams() throws IOException {
+    IOException ioe = null;
+    if(checksumIn!=null) {
+      try {
+        checksumIn.close(); // close checksum file
+      } catch (IOException e) {
+        ioe = e;
+      }
+      checksumIn = null;
+    }
+    if(dataIn!=null) {
+      try {
+        dataIn.close(); // close data file
+      } catch (IOException e) {
+        ioe = e;
+      }
+      dataIn = null;
+      dataInFd = null;
+    }
+    if (volumeRef != null) {
+      IOUtils.cleanup(null, volumeRef);
+      volumeRef = null;
+    }
+    // throw IOException if there is any
+    if(ioe!= null) {
+      throw ioe;
+    }
+  }
+
   @Override
   public void close() {
     IOUtils.closeStream(dataIn);
+    dataIn = null;
+    dataInFd = null;
     IOUtils.closeStream(checksumIn);
+    checksumIn = null;
     IOUtils.cleanup(null, volumeRef);
+    volumeRef = null;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
index bd1461a..a66847a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
@@ -18,32 +18,62 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset;
 
 import java.io.Closeable;
+import java.io.FileDescriptor;
 import java.io.FileOutputStream;
 import java.io.OutputStream;
 import java.io.IOException;
 
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIOException;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
 
 /**
  * Contains the output streams for the data and checksum of a replica.
  */
 public class ReplicaOutputStreams implements Closeable {
-  private final OutputStream dataOut;
+  public static final Logger LOG = DataNode.LOG;
+
+  private FileDescriptor outFd = null;
+  /** Stream to block. */
+  private OutputStream dataOut;
+  /** Stream to checksum. */
   private final OutputStream checksumOut;
   private final DataChecksum checksum;
   private final boolean isTransientStorage;
+  private final long slowLogThresholdMs;
 
   /**
    * Create an object with a data output stream, a checksum output stream
    * and a checksum.
    */
-  public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut,
-      DataChecksum checksum, boolean isTransientStorage) {
+  public ReplicaOutputStreams(OutputStream dataOut,
+      OutputStream checksumOut, DataChecksum checksum,
+      boolean isTransientStorage, long slowLogThresholdMs) {
     this.dataOut = dataOut;
-    this.checksumOut = checksumOut;
     this.checksum = checksum;
+    this.slowLogThresholdMs = slowLogThresholdMs;
     this.isTransientStorage = isTransientStorage;
+    this.checksumOut = checksumOut;
+
+    try {
+      if (this.dataOut instanceof FileOutputStream) {
+        this.outFd = ((FileOutputStream)this.dataOut).getFD();
+      } else {
+        LOG.debug("Could not get file descriptor for outputstream of class " +
+            this.dataOut.getClass());
+      }
+    } catch (IOException e) {
+      LOG.warn("Could not get file descriptor for outputstream of class " +
+          this.dataOut.getClass());
+    }
+  }
+
+  public FileDescriptor getOutFd() {
+    return outFd;
   }
 
   /** @return the data output stream. */
@@ -72,12 +102,17 @@ public class ReplicaOutputStreams implements Closeable {
     IOUtils.closeStream(checksumOut);
   }
 
+  public void closeDataStream() throws IOException {
+    dataOut.close();
+    dataOut = null;
+  }
+
   /**
    * Sync the data stream if it supports it.
    */
   public void syncDataOut() throws IOException {
     if (dataOut instanceof FileOutputStream) {
-      ((FileOutputStream)dataOut).getChannel().force(true);
+      sync((FileOutputStream)dataOut);
     }
   }
   
@@ -86,8 +121,68 @@ public class ReplicaOutputStreams implements Closeable {
    */
   public void syncChecksumOut() throws IOException {
     if (checksumOut instanceof FileOutputStream) {
-      ((FileOutputStream)checksumOut).getChannel().force(true);
+      sync((FileOutputStream)checksumOut);
     }
   }
 
+  /**
+   * Flush the data stream if it supports it.
+   */
+  public void flushDataOut() throws IOException {
+    flush(dataOut);
+  }
+
+  /**
+   * Flush the checksum stream if it supports it.
+   */
+  public void flushChecksumOut() throws IOException {
+    flush(checksumOut);
+  }
+
+  private void flush(OutputStream dos) throws IOException {
+    long begin = Time.monotonicNow();
+    dos.flush();
+    long duration = Time.monotonicNow() - begin;
+    LOG.trace("ReplicaOutputStreams#flush takes {} ms.", duration);
+    if (duration > slowLogThresholdMs) {
+      LOG.warn("Slow flush took {} ms (threshold={} ms)", duration,
+          slowLogThresholdMs);
+    }
+  }
+
+  private void sync(FileOutputStream fos) throws IOException {
+    long begin = Time.monotonicNow();
+    fos.getChannel().force(true);
+    long duration = Time.monotonicNow() - begin;
+    LOG.trace("ReplicaOutputStreams#sync takes {} ms.", duration);
+    if (duration > slowLogThresholdMs) {
+      LOG.warn("Slow fsync took {} ms (threshold={} ms)", duration,
+          slowLogThresholdMs);
+    }
+  }
+
+  public long writeToDisk(byte[] b, int off, int len) throws IOException {
+    long begin = Time.monotonicNow();
+    dataOut.write(b, off, len);
+    long duration = Time.monotonicNow() - begin;
+    LOG.trace("DatanodeIO#writeToDisk takes {} ms.", duration);
+    if (duration > slowLogThresholdMs) {
+      LOG.warn("Slow BlockReceiver write data to disk cost: {} ms " +
+          "(threshold={} ms)", duration, slowLogThresholdMs);
+    }
+    return duration;
+  }
+
+  public void syncFileRangeIfPossible(long offset, long nbytes,
+      int flags) throws NativeIOException {
+    assert this.outFd != null : "null outFd!";
+    NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, nbytes, flags);
+  }
+
+  public void dropCacheBehindWrites(String identifier,
+      long offset, long len, int flags) throws NativeIOException {
+    assert this.outFd != null : "null outFd!";
+    NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+        identifier, outFd, offset, len, flags);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 29dbb29..63e82f3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -49,11 +49,13 @@ import 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
 import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
+
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker;
@@ -145,7 +147,7 @@ class BlockPoolSlice {
     //
     this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
     if (tmpDir.exists()) {
-      FileUtil.fullyDelete(tmpDir);
+      DataStorage.fullyDelete(tmpDir);
     }
     this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
     if (!rbwDir.mkdirs()) {  // create rbw directory if not exist
@@ -436,7 +438,7 @@ class BlockPoolSlice {
 
           final File targetMetaFile = new File(targetDir, metaFile.getName());
           try {
-            NativeIO.renameTo(metaFile, targetMetaFile);
+            LocalReplica.rename(metaFile, targetMetaFile);
           } catch (IOException e) {
             LOG.warn("Failed to move meta file from "
                 + metaFile + " to " + targetMetaFile, e);
@@ -446,7 +448,7 @@ class BlockPoolSlice {
 
           final File targetBlockFile = new File(targetDir, 
blockFile.getName());
           try {
-            NativeIO.renameTo(blockFile, targetBlockFile);
+            LocalReplica.rename(blockFile, targetBlockFile);
           } catch (IOException e) {
             LOG.warn("Failed to move block file from "
                 + blockFile + " to " + targetBlockFile, e);
@@ -688,8 +690,6 @@ class BlockPoolSlice {
    * @return the number of valid bytes
    */
   private long validateIntegrityAndSetLength(File blockFile, long genStamp) {
-    DataInputStream checksumIn = null;
-    InputStream blockIn = null;
     try {
       final File metaFile = FsDatasetUtil.getMetaFile(blockFile, genStamp);
       long blockFileLen = blockFile.length();
@@ -699,57 +699,52 @@ class BlockPoolSlice {
           !metaFile.exists() || metaFileLen < crcHeaderLen) {
         return 0;
       }
-      checksumIn = new DataInputStream(
+      try (DataInputStream checksumIn = new DataInputStream(
           new BufferedInputStream(new FileInputStream(metaFile),
-              ioFileBufferSize));
-
-      // read and handle the common header here. For now just a version
-      final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
-          checksumIn, metaFile);
-      int bytesPerChecksum = checksum.getBytesPerChecksum();
-      int checksumSize = checksum.getChecksumSize();
-      long numChunks = Math.min(
-          (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum,
-          (metaFileLen - crcHeaderLen)/checksumSize);
-      if (numChunks == 0) {
-        return 0;
-      }
-      IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize);
-      blockIn = new FileInputStream(blockFile);
-      long lastChunkStartPos = (numChunks-1)*bytesPerChecksum;
-      IOUtils.skipFully(blockIn, lastChunkStartPos);
-      int lastChunkSize = (int)Math.min(
-          bytesPerChecksum, blockFileLen-lastChunkStartPos);
-      byte[] buf = new byte[lastChunkSize+checksumSize];
-      checksumIn.readFully(buf, lastChunkSize, checksumSize);
-      IOUtils.readFully(blockIn, buf, 0, lastChunkSize);
-
-      checksum.update(buf, 0, lastChunkSize);
-      long validFileLength;
-      if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
-        validFileLength = lastChunkStartPos + lastChunkSize;
-      } else { // last chunck is corrupt
-        validFileLength = lastChunkStartPos;
-      }
-
-      // truncate if extra bytes are present without CRC
-      if (blockFile.length() > validFileLength) {
-        RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
-        try {
-          // truncate blockFile
-          blockRAF.setLength(validFileLength);
-        } finally {
-          blockRAF.close();
+              ioFileBufferSize))) {
+        // read and handle the common header here. For now just a version
+        final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
+            checksumIn, metaFile);
+        int bytesPerChecksum = checksum.getBytesPerChecksum();
+        int checksumSize = checksum.getChecksumSize();
+        long numChunks = Math.min(
+            (blockFileLen + bytesPerChecksum - 1) / bytesPerChecksum,
+            (metaFileLen - crcHeaderLen) / checksumSize);
+        if (numChunks == 0) {
+          return 0;
+        }
+        try (InputStream blockIn = new FileInputStream(blockFile);
+             ReplicaInputStreams ris = new ReplicaInputStreams(blockIn,
+                 checksumIn, volume.obtainReference())) {
+          ris.skipChecksumFully((numChunks - 1) * checksumSize);
+          long lastChunkStartPos = (numChunks - 1) * bytesPerChecksum;
+          ris.skipDataFully(lastChunkStartPos);
+          int lastChunkSize = (int) Math.min(
+              bytesPerChecksum, blockFileLen - lastChunkStartPos);
+          byte[] buf = new byte[lastChunkSize + checksumSize];
+          ris.readChecksumFully(buf, lastChunkSize, checksumSize);
+          ris.readDataFully(buf, 0, lastChunkSize);
+          checksum.update(buf, 0, lastChunkSize);
+          long validFileLength;
+          if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
+            validFileLength = lastChunkStartPos + lastChunkSize;
+          } else { // last chunk is corrupt
+            validFileLength = lastChunkStartPos;
+          }
+          // truncate if extra bytes are present without CRC
+          if (blockFile.length() > validFileLength) {
+            try (RandomAccessFile blockRAF =
+                     new RandomAccessFile(blockFile, "rw")) {
+              // truncate blockFile
+              blockRAF.setLength(validFileLength);
+            }
+          }
+          return validFileLength;
         }
       }
-
-      return validFileLength;
     } catch (IOException e) {
       FsDatasetImpl.LOG.warn(e);
       return 0;
-    } finally {
-      IOUtils.closeStream(checksumIn);
-      IOUtils.closeStream(blockIn);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index b9c731b..97dcf8d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
-import java.io.FileDescriptor;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,9 +37,9 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIOException;
 
 /**
@@ -202,13 +201,13 @@ class FsDatasetAsyncDiskService {
   }
 
   public void submitSyncFileRangeRequest(FsVolumeImpl volume,
-      final FileDescriptor fd, final long offset, final long nbytes,
+      final ReplicaOutputStreams streams, final long offset, final long nbytes,
       final int flags) {
     execute(volume, new Runnable() {
       @Override
       public void run() {
         try {
-          NativeIO.POSIX.syncFileRangeIfPossible(fd, offset, nbytes, flags);
+          streams.syncFileRangeIfPossible(offset, nbytes, flags);
         } catch (NativeIOException e) {
           LOG.warn("sync_file_range error", e);
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 954d6ef..6065df2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -21,7 +21,6 @@ import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.File;
-import java.io.FileDescriptor;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -2755,9 +2754,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
 
   @Override
   public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
-      FileDescriptor fd, long offset, long nbytes, int flags) {
+      ReplicaOutputStreams outs, long offset, long nbytes, int flags) {
     FsVolumeImpl fsVolumeImpl = this.getVolume(block);
-    asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset,
+    asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, outs, offset,
         nbytes, flags);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index a231e03..08564de 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -1067,7 +1067,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
         DataStorage.STORAGE_DIR_LAZY_PERSIST);
     File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
     if (force) {
-      FileUtil.fullyDelete(bpDir);
+      DataStorage.fullyDelete(bpDir);
     } else {
       if (!rbwDir.delete()) {
         throw new IOException("Failed to delete " + rbwDir);
@@ -1081,7 +1081,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
           !FileUtil.fullyDelete(lazypersistDir)))) {
         throw new IOException("Failed to delete " + lazypersistDir);
       }
-      FileUtil.fullyDelete(tmpDir);
+      DataStorage.fullyDelete(tmpDir);
       for (File f : FileUtil.listFiles(bpCurrentDir)) {
         if (!f.delete()) {
           throw new IOException("Failed to delete " + f);
@@ -1437,4 +1437,3 @@ public class FsVolumeImpl implements FsVolumeSpi {
         replicaState);
   }
 }
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
index 20cec6a..e963d41 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
@@ -701,7 +701,7 @@ public class TestFileAppend{
       ReplicaBeingWritten rbw =
           (ReplicaBeingWritten)replicaHandler.getReplica();
       ReplicaOutputStreams
-          outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM);
+          outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM, 300);
       OutputStream dataOutput = outputStreams.getDataOut();
 
       byte[] appendBytes = new byte[1];

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 5d63d07..ae52905 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
-import java.io.FileDescriptor;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -261,14 +260,15 @@ public class SimulatedFSDataset implements 
FsDatasetSpi<FsVolumeSpi> {
 
     @Override
     synchronized public ReplicaOutputStreams createStreams(boolean isCreate, 
-        DataChecksum requestedChecksum) throws IOException {
+        DataChecksum requestedChecksum, long slowLogThresholdMs)
+        throws IOException {
       if (finalized) {
         throw new IOException("Trying to write to a finalized replica "
             + theBlock);
       } else {
         SimulatedOutputStream crcStream = new SimulatedOutputStream();
         return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
-            volume.isTransientStorage());
+            volume.isTransientStorage(), slowLogThresholdMs);
       }
     }
 
@@ -1364,7 +1364,7 @@ public class SimulatedFSDataset implements 
FsDatasetSpi<FsVolumeSpi> {
 
   @Override
   public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
-      FileDescriptor fd, long offset, long nbytes, int flags) {
+      ReplicaOutputStreams outs, long offset, long nbytes, int flags) {
     throw new UnsupportedOperationException();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 619eda0..8439991 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -673,7 +673,7 @@ public class TestBlockRecovery {
     ReplicaOutputStreams streams = null;
     try {
       streams = replicaInfo.createStreams(true,
-          DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
+          DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300);
       streams.getChecksumOut().write('a');
       dn.data.initReplicaRecovery(new RecoveringBlock(block, null, 
RECOVERY_ID+1));
       BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index 4e724bc7..fa980c2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -83,7 +83,7 @@ public class TestSimulatedFSDataset {
       ReplicaInPipeline bInfo = fsdataset.createRbw(
           StorageType.DEFAULT, b, false).getReplica();
       ReplicaOutputStreams out = bInfo.createStreams(true,
-          DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
+          DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300);
       try {
         OutputStream dataOut  = out.getDataOut();
         assertEquals(0, fsdataset.getLength(b));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 4166346..2417c9d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -318,8 +318,8 @@ public class ExternalDatasetImpl implements 
FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, 
FileDescriptor fd, long offset, long nbytes, int flags) {
-
+  public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
+      ReplicaOutputStreams outs, long offset, long nbytes, int flags) {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
index 90c3b8a..6fa2830 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
@@ -58,8 +58,10 @@ public class ExternalReplicaInPipeline implements 
ReplicaInPipeline {
 
   @Override
   public ReplicaOutputStreams createStreams(boolean isCreate,
-      DataChecksum requestedChecksum) throws IOException {
-    return new ReplicaOutputStreams(null, null, requestedChecksum, false);
+      DataChecksum requestedChecksum, long slowLogThresholdMs)
+      throws IOException {
+    return new ReplicaOutputStreams(null, null, requestedChecksum, false,
+        slowLogThresholdMs);
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to