HDFS-6955. DN should reserve disk space for a full block when creating tmp 
files (Contributed by Kanaka Kumar Avvaru)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/92c1af16
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/92c1af16
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/92c1af16

Branch: refs/heads/HDFS-7966
Commit: 92c1af1646b1d91a2ab7821e4f7d450e3b6e10bb
Parents: a7201d6
Author: Vinayakumar B <vinayakum...@apache.org>
Authored: Fri Sep 18 16:37:10 2015 +0530
Committer: Vinayakumar B <vinayakum...@apache.org>
Committed: Fri Sep 18 16:37:10 2015 +0530

----------------------------------------------------------------------
 .../hdfs/server/datanode/BlockReceiver.java     |   5 +-
 .../server/datanode/fsdataset/FsVolumeSpi.java  |   8 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  13 +-
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |  72 ++-
 .../server/datanode/SimulatedFSDataset.java     |   2 +-
 .../server/datanode/TestDirectoryScanner.java   |   2 +-
 .../datanode/extdataset/ExternalVolumeImpl.java |   2 +-
 .../fsdataset/impl/TestRbwSpaceReservation.java | 452 ---------------
 .../fsdataset/impl/TestSpaceReservation.java    | 576 +++++++++++++++++++
 9 files changed, 637 insertions(+), 495 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index bc5396f..957b2c7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -117,7 +117,7 @@ class BlockReceiver implements Closeable {
   /** the block to receive */
   private final ExtendedBlock block; 
   /** the replica to write */
-  private final ReplicaInPipelineInterface replicaInfo;
+  private ReplicaInPipelineInterface replicaInfo;
   /** pipeline stage */
   private final BlockConstructionStage stage;
   private final boolean isTransfer;
@@ -259,6 +259,9 @@ class BlockReceiver implements Closeable {
     } catch (ReplicaNotFoundException bne) {
       throw bne;
     } catch(IOException ioe) {
+      if (replicaInfo != null) {
+        replicaInfo.releaseAllBytesReserved();
+      }
       IOUtils.closeStream(this);
       cleanupBlock();
       

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index ee01924..9e16121 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -62,13 +62,13 @@ public interface FsVolumeSpi {
   boolean isTransientStorage();
 
   /**
-   * Reserve disk space for an RBW block so a writer does not run out of
-   * space before the block is full.
+   * Reserve disk space for a block (RBW or Re-replicating)
+   * so a writer does not run out of space before the block is full.
    */
-  void reserveSpaceForRbw(long bytesToReserve);
+  void reserveSpaceForReplica(long bytesToReserve);
 
   /**
-   * Release disk space previously reserved for RBW block.
+   * Release disk space previously reserved for block opened for write.
    */
   void releaseReservedSpace(long bytesToRelease);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 8722d35..32eb724 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1157,7 +1157,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     
     // Replace finalized replica by a RBW replica in replicas map
     volumeMap.add(bpid, newReplicaInfo);
-    v.reserveSpaceForRbw(estimateBlockLen - replicaInfo.getNumBytes());
+    v.reserveSpaceForReplica(estimateBlockLen - replicaInfo.getNumBytes());
     return newReplicaInfo;
   }
 
@@ -1487,7 +1487,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
           }
           ReplicaInPipeline newReplicaInfo =
               new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
-                  f.getParentFile(), 0);
+                  f.getParentFile(), b.getLocalBlock().getNumBytes());
           volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
           return new ReplicaHandler(newReplicaInfo, ref);
         } else {
@@ -1604,7 +1604,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     if (replicaInfo != null && replicaInfo.getState() == 
ReplicaState.TEMPORARY) {
       // remove from volumeMap
       volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
-      
+
       // delete the on-disk temp file
       if (delBlockFromDisk(replicaInfo.getBlockFile(), 
           replicaInfo.getMetaFile(), b.getLocalBlock())) {
@@ -2555,14 +2555,15 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
     final long usedSpace; // size of space used by HDFS
     final long freeSpace; // size of free space excluding reserved space
     final long reservedSpace; // size of space reserved for non-HDFS
-    final long reservedSpaceForRBW; // size of space reserved RBW
+    final long reservedSpaceForReplicas; // size of space reserved RBW or
+                                    // re-replication
 
     VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace) {
       this.directory = v.toString();
       this.usedSpace = usedSpace;
       this.freeSpace = freeSpace;
       this.reservedSpace = v.getReserved();
-      this.reservedSpaceForRBW = v.getReservedForRbw();
+      this.reservedSpaceForReplicas = v.getReservedForReplicas();
     }
   }  
 
@@ -2596,7 +2597,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       innerInfo.put("usedSpace", v.usedSpace);
       innerInfo.put("freeSpace", v.freeSpace);
       innerInfo.put("reservedSpace", v.reservedSpace);
-      innerInfo.put("reservedSpaceForRBW", v.reservedSpaceForRBW);
+      innerInfo.put("reservedSpaceForReplicas", v.reservedSpaceForReplicas);
       info.put(v.directory, innerInfo);
     }
     return info;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index e90f5d2..8fd52c3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -22,8 +22,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
-import java.nio.channels.ClosedChannelException;
 import java.io.OutputStreamWriter;
+import java.nio.channels.ClosedChannelException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
@@ -40,9 +40,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
@@ -54,21 +51,24 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.util.CloseableReferenceCount;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.CloseableReferenceCount;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.util.Time;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * The underlying volume used to store replica.
  * 
@@ -90,8 +90,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
   private final long reserved;
   private CloseableReferenceCount reference = new CloseableReferenceCount();
 
-  // Disk space reserved for open blocks.
-  private AtomicLong reservedForRbw;
+  // Disk space reserved for blocks (RBW or Re-replicating) open for write.
+  private AtomicLong reservedForReplicas;
+  private long recentReserved = 0;
 
   // Capacity configured. This is useful when we want to
   // limit the visible capacity for tests. If negative, then we just
@@ -113,8 +114,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
     this.reserved = conf.getLong(
         DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
         DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
-    this.reservedForRbw = new AtomicLong(0L);
-    this.currentDir = currentDir; 
+    this.reservedForReplicas = new AtomicLong(0L);
+    this.currentDir = currentDir;
     File parent = currentDir.getParentFile();
     this.usage = new DF(parent, conf);
     this.storageType = storageType;
@@ -353,8 +354,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
    */
   @Override
   public long getAvailable() throws IOException {
-    long remaining = getCapacity() - getDfsUsed() - reservedForRbw.get();
-    long available = usage.getAvailable() - reserved - reservedForRbw.get();
+    long remaining = getCapacity() - getDfsUsed() - reservedForReplicas.get();
+    long available = usage.getAvailable() - reserved
+        - reservedForReplicas.get();
     if (remaining > available) {
       remaining = available;
     }
@@ -362,10 +364,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
   }
 
   @VisibleForTesting
-  public long getReservedForRbw() {
-    return reservedForRbw.get();
+  public long getReservedForReplicas() {
+    return reservedForReplicas.get();
   }
-    
+
+  @VisibleForTesting
+  long getRecentReserved() {
+    return recentReserved;
+  }
+
   long getReserved(){
     return reserved;
   }
@@ -412,13 +419,20 @@ public class FsVolumeImpl implements FsVolumeSpi {
    */
   File createTmpFile(String bpid, Block b) throws IOException {
     checkReference();
-    return getBlockPoolSlice(bpid).createTmpFile(b);
+    reserveSpaceForReplica(b.getNumBytes());
+    try {
+      return getBlockPoolSlice(bpid).createTmpFile(b);
+    } catch (IOException exception) {
+      releaseReservedSpace(b.getNumBytes());
+      throw exception;
+    }
   }
 
   @Override
-  public void reserveSpaceForRbw(long bytesToReserve) {
+  public void reserveSpaceForReplica(long bytesToReserve) {
     if (bytesToReserve != 0) {
-      reservedForRbw.addAndGet(bytesToReserve);
+      reservedForReplicas.addAndGet(bytesToReserve);
+      recentReserved = bytesToReserve;
     }
   }
 
@@ -428,14 +442,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
 
       long oldReservation, newReservation;
       do {
-        oldReservation = reservedForRbw.get();
+        oldReservation = reservedForReplicas.get();
         newReservation = oldReservation - bytesToRelease;
         if (newReservation < 0) {
-          // Failsafe, this should never occur in practice, but if it does we 
don't
-          // want to start advertising more space than we have available.
+          // Failsafe, this should never occur in practice, but if it does we
+          // don't want to start advertising more space than we have available.
           newReservation = 0;
         }
-      } while (!reservedForRbw.compareAndSet(oldReservation, newReservation));
+      } while (!reservedForReplicas.compareAndSet(oldReservation,
+          newReservation));
     }
   }
 
@@ -779,7 +794,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
    */
   File createRbwFile(String bpid, Block b) throws IOException {
     checkReference();
-    reserveSpaceForRbw(b.getNumBytes());
+    reserveSpaceForReplica(b.getNumBytes());
     try {
       return getBlockPoolSlice(bpid).createRbwFile(b);
     } catch (IOException exception) {
@@ -790,16 +805,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
 
   /**
    *
-   * @param bytesReservedForRbw Space that was reserved during
+   * @param bytesReserved Space that was reserved during
    *     block creation. Now that the block is being finalized we
    *     can free up this space.
    * @return
    * @throws IOException
    */
-  File addFinalizedBlock(String bpid, Block b,
-                         File f, long bytesReservedForRbw)
+  File addFinalizedBlock(String bpid, Block b, File f, long bytesReserved)
       throws IOException {
-    releaseReservedSpace(bytesReservedForRbw);
+    releaseReservedSpace(bytesReserved);
     return getBlockPoolSlice(bpid).addFinalizedBlock(b, f);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 5d1b31a..acbd8a8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -492,7 +492,7 @@ public class SimulatedFSDataset implements 
FsDatasetSpi<FsVolumeSpi> {
     }
 
     @Override
-    public void reserveSpaceForRbw(long bytesToReserve) {
+    public void reserveSpaceForReplica(long bytesToReserve) {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index 9b942b7..baf50d8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -612,7 +612,7 @@ public class TestDirectoryScanner {
     }
 
     @Override
-    public void reserveSpaceForRbw(long bytesToReserve) {
+    public void reserveSpaceForReplica(long bytesToReserve) {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
index 3242ff7..985a259 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
@@ -74,7 +74,7 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
   }
 
   @Override
-  public void reserveSpaceForRbw(long bytesToReserve) {
+  public void reserveSpaceForReplica(long bytesToReserve) {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
deleted file mode 100644
index a647d96..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
+++ /dev/null
@@ -1,452 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-
-import com.google.common.base.Supplier;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.Daemon;
-import org.apache.log4j.Level;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.management.ManagementFactory;
-import java.lang.reflect.Field;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeoutException;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-/**
- * Ensure that the DN reserves disk space equivalent to a full block for
- * replica being written (RBW).
- */
-public class TestRbwSpaceReservation {
-  static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class);
-
-  private static final int DU_REFRESH_INTERVAL_MSEC = 500;
-  private static final int STORAGES_PER_DATANODE = 1;
-  private static final int BLOCK_SIZE = 1024 * 1024;
-  private static final int SMALL_BLOCK_SIZE = 1024;
-
-  protected MiniDFSCluster cluster;
-  private Configuration conf;
-  private DistributedFileSystem fs = null;
-  private DFSClient client = null;
-  FsVolumeReference singletonVolumeRef = null;
-  FsVolumeImpl singletonVolume = null;
-
-  private static Random rand = new Random();
-
-  private void initConfig(int blockSize) {
-    conf = new HdfsConfiguration();
-
-    // Refresh disk usage information frequently.
-    conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC);
-    conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
-
-    // Disable the scanner
-    conf.setInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
-  }
-
-  static {
-    ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
-  }
-
-  /**
-   *
-   * @param blockSize
-   * @param perVolumeCapacity limit the capacity of each volume to the given
-   *                          value. If negative, then don't limit.
-   * @throws IOException
-   */
-  private void startCluster(int blockSize, int numDatanodes, long 
perVolumeCapacity) throws IOException {
-    initConfig(blockSize);
-
-    cluster = new MiniDFSCluster
-        .Builder(conf)
-        .storagesPerDatanode(STORAGES_PER_DATANODE)
-        .numDataNodes(numDatanodes)
-        .build();
-    fs = cluster.getFileSystem();
-    client = fs.getClient();
-    cluster.waitActive();
-
-    if (perVolumeCapacity >= 0) {
-      try (FsDatasetSpi.FsVolumeReferences volumes =
-          
cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) {
-        singletonVolumeRef = volumes.get(0).obtainReference();
-      }
-      singletonVolume = ((FsVolumeImpl) singletonVolumeRef.getVolume());
-      singletonVolume.setCapacityForTesting(perVolumeCapacity);
-    }
-  }
-
-  @After
-  public void shutdownCluster() throws IOException {
-    if (singletonVolumeRef != null) {
-      singletonVolumeRef.close();
-      singletonVolumeRef = null;
-    }
-
-    if (client != null) {
-      client.close();
-      client = null;
-    }
-
-    if (fs != null) {
-      fs.close();
-      fs = null;
-    }
-
-    if (cluster != null) {
-      cluster.shutdown();
-      cluster = null;
-    }
-  }
-
-  private void createFileAndTestSpaceReservation(
-      final String fileNamePrefix, final int fileBlockSize)
-      throws IOException, InterruptedException {
-    // Enough for 1 block + meta files + some delta.
-    final long configuredCapacity = fileBlockSize * 2 - 1;
-    startCluster(BLOCK_SIZE, 1, configuredCapacity);
-    FSDataOutputStream out = null;
-    Path path = new Path("/" + fileNamePrefix + ".dat");
-
-    try {
-      out = fs.create(path, false, 4096, (short) 1, fileBlockSize);
-
-      byte[] buffer = new byte[rand.nextInt(fileBlockSize / 4)];
-      out.write(buffer);
-      out.hsync();
-      int bytesWritten = buffer.length;
-
-      // Check that space was reserved for a full block minus the bytesWritten.
-      assertThat(singletonVolume.getReservedForRbw(),
-                 is((long) fileBlockSize - bytesWritten));
-      out.close();
-      out = null;
-
-      // Check that the reserved space has been released since we closed the
-      // file.
-      assertThat(singletonVolume.getReservedForRbw(), is(0L));
-
-      // Reopen the file for appends and write 1 more byte.
-      out = fs.append(path);
-      out.write(buffer);
-      out.hsync();
-      bytesWritten += buffer.length;
-
-      // Check that space was again reserved for a full block minus the
-      // bytesWritten so far.
-      assertThat(singletonVolume.getReservedForRbw(),
-                 is((long) fileBlockSize - bytesWritten));
-
-      // Write once again and again verify the available space. This ensures
-      // that the reserved space is progressively adjusted to account for bytes
-      // written to disk.
-      out.write(buffer);
-      out.hsync();
-      bytesWritten += buffer.length;
-      assertThat(singletonVolume.getReservedForRbw(),
-                 is((long) fileBlockSize - bytesWritten));
-    } finally {
-      if (out != null) {
-        out.close();
-      }
-    }
-  }
-
-  @Test (timeout=300000)
-  public void testWithDefaultBlockSize()
-      throws IOException, InterruptedException {
-    createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), 
BLOCK_SIZE);
-  }
-
-  @Test (timeout=300000)
-  public void testWithNonDefaultBlockSize()
-      throws IOException, InterruptedException {
-    // Same test as previous one, but with a non-default block size.
-    createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), 
BLOCK_SIZE * 2);
-  }
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Test (timeout=300000)
-  public void testWithLimitedSpace() throws IOException {
-    // Cluster with just enough space for a full block + meta.
-    startCluster(BLOCK_SIZE, 1, 2 * BLOCK_SIZE - 1);
-    final String methodName = GenericTestUtils.getMethodName();
-    Path file1 = new Path("/" + methodName + ".01.dat");
-    Path file2 = new Path("/" + methodName + ".02.dat");
-
-    // Create two files.
-    FSDataOutputStream os1 = null, os2 = null;
-
-    try {
-      os1 = fs.create(file1);
-      os2 = fs.create(file2);
-
-      // Write one byte to the first file.
-      byte[] data = new byte[1];
-      os1.write(data);
-      os1.hsync();
-
-      // Try to write one byte to the second file.
-      // The block allocation must fail.
-      thrown.expect(RemoteException.class);
-      os2.write(data);
-      os2.hsync();
-    } finally {
-      if (os1 != null) {
-        os1.close();
-      }
-
-      // os2.close() will fail as no block was allocated.
-    }
-  }
-
-  /**
-   * Ensure that reserved space is released when the client goes away
-   * unexpectedly.
-   *
-   * The verification is done for each replica in the write pipeline.
-   *
-   * @throws IOException
-   */
-  @Test(timeout=300000)
-  public void testSpaceReleasedOnUnexpectedEof()
-      throws IOException, InterruptedException, TimeoutException {
-    final short replication = 3;
-    startCluster(BLOCK_SIZE, replication, -1);
-
-    final String methodName = GenericTestUtils.getMethodName();
-    final Path file = new Path("/" + methodName + ".01.dat");
-
-    // Write 1 byte to the file and kill the writer.
-    FSDataOutputStream os = fs.create(file, replication);
-    os.write(new byte[1]);
-    os.hsync();
-    DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());
-
-    // Ensure all space reserved for the replica was released on each
-    // DataNode.
-    for (DataNode dn : cluster.getDataNodes()) {
-      try (FsDatasetSpi.FsVolumeReferences volumes =
-          dn.getFSDataset().getFsVolumeReferences()) {
-        final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
-        GenericTestUtils.waitFor(new Supplier<Boolean>() {
-          @Override
-          public Boolean get() {
-            return (volume.getReservedForRbw() == 0);
-          }
-        }, 500, Integer.MAX_VALUE); // Wait until the test times out.
-      }
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test(timeout = 30000)
-  public void testRBWFileCreationError() throws Exception {
-
-    final short replication = 1;
-    startCluster(BLOCK_SIZE, replication, -1);
-
-    final FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
-        .get(0).getFSDataset().getFsVolumeReferences().get(0);
-    final String methodName = GenericTestUtils.getMethodName();
-    final Path file = new Path("/" + methodName + ".01.dat");
-
-    // Mock BlockPoolSlice so that RBW file creation gives IOExcception
-    BlockPoolSlice blockPoolSlice = Mockito.mock(BlockPoolSlice.class);
-    Mockito.when(blockPoolSlice.createRbwFile((Block) Mockito.any()))
-        .thenThrow(new IOException("Synthetic IO Exception Throgh MOCK"));
-
-    Field field = FsVolumeImpl.class.getDeclaredField("bpSlices");
-    field.setAccessible(true);
-    Map<String, BlockPoolSlice> bpSlices = (Map<String, BlockPoolSlice>) field
-        .get(fsVolumeImpl);
-    bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice);
-
-    try {
-      // Write 1 byte to the file
-      FSDataOutputStream os = fs.create(file, replication);
-      os.write(new byte[1]);
-      os.hsync();
-      os.close();
-      fail("Expecting IOException file creation failure");
-    } catch (IOException e) {
-      // Exception can be ignored (expected)
-    }
-
-    // Ensure RBW space reserved is released
-    assertTrue("Expected ZERO but got " + fsVolumeImpl.getReservedForRbw(),
-        fsVolumeImpl.getReservedForRbw() == 0);
-  }
-
-  @Test(timeout = 30000)
-  public void testRBWInJMXBean() throws Exception {
-
-    final short replication = 1;
-    startCluster(BLOCK_SIZE, replication, -1);
-
-    final String methodName = GenericTestUtils.getMethodName();
-    final Path file = new Path("/" + methodName + ".01.dat");
-
-    try (FSDataOutputStream os = fs.create(file, replication)) {
-      // Write 1 byte to the file
-      os.write(new byte[1]);
-      os.hsync();
-
-      final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-      final ObjectName mxbeanName = new ObjectName(
-          "Hadoop:service=DataNode,name=DataNodeInfo");
-      final String volumeInfo = (String) mbs.getAttribute(mxbeanName,
-          "VolumeInfo");
-
-      assertTrue(volumeInfo.contains("reservedSpaceForRBW"));
-    }
-  }
-
-  /**
-   * Stress test to ensure we are not leaking reserved space.
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  @Test (timeout=600000)
-  public void stressTest() throws IOException, InterruptedException {
-    final int numWriters = 5;
-    startCluster(SMALL_BLOCK_SIZE, 1, SMALL_BLOCK_SIZE * numWriters * 10);
-    Writer[] writers = new Writer[numWriters];
-
-    // Start a few writers and let them run for a while.
-    for (int i = 0; i < numWriters; ++i) {
-      writers[i] = new Writer(client, SMALL_BLOCK_SIZE);
-      writers[i].start();
-    }
-
-    Thread.sleep(60000);
-
-    // Stop the writers.
-    for (Writer w : writers) {
-      w.stopWriter();
-    }
-    int filesCreated = 0;
-    int numFailures = 0;
-    for (Writer w : writers) {
-      w.join();
-      filesCreated += w.getFilesCreated();
-      numFailures += w.getNumFailures();
-    }
-
-    LOG.info("Stress test created " + filesCreated +
-             " files and hit " + numFailures + " failures");
-
-    // Check no space was leaked.
-    assertThat(singletonVolume.getReservedForRbw(), is(0L));
-  }
-
-  private static class Writer extends Daemon {
-    private volatile boolean keepRunning;
-    private final DFSClient localClient;
-    private int filesCreated = 0;
-    private int numFailures = 0;
-    byte[] data;
-
-    Writer(DFSClient client, int blockSize) throws IOException {
-      localClient = client;
-      keepRunning = true;
-      filesCreated = 0;
-      numFailures = 0;
-
-      // At least some of the files should span a block boundary.
-      data = new byte[blockSize * 2];
-    }
-
-    @Override
-    public void run() {
-      /**
-       * Create a file, write up to 3 blocks of data and close the file.
-       * Do this in a loop until we are told to stop.
-       */
-      while (keepRunning) {
-        OutputStream os = null;
-        try {
-          String filename = "/file-" + rand.nextLong();
-          os = localClient.create(filename, false);
-          os.write(data, 0, rand.nextInt(data.length));
-          IOUtils.closeQuietly(os);
-          os = null;
-          localClient.delete(filename, false);
-          Thread.sleep(50);     // Sleep for a bit to avoid killing the system.
-          ++filesCreated;
-        } catch (IOException ioe) {
-          // Just ignore the exception and keep going.
-          ++numFailures;
-        } catch (InterruptedException ie) {
-          return;
-        } finally {
-          if (os != null) {
-            IOUtils.closeQuietly(os);
-          }
-        }
-      }
-    }
-
-    public void stopWriter() {
-      keepRunning = false;
-    }
-
-    public int getFilesCreated() {
-      return filesCreated;
-    }
-
-    public int getNumFailures() {
-      return numFailures;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java
new file mode 100644
index 0000000..c494288
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import com.google.common.base.Supplier;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeoutException;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+/**
+ * Ensure that the DN reserves disk space equivalent to a full block for
+ * replica being written (RBW) & Replica being copied from another DN.
+ */
+public class TestSpaceReservation {
+  static final Log LOG = LogFactory.getLog(TestSpaceReservation.class);
+
+  private static final int DU_REFRESH_INTERVAL_MSEC = 500;
+  private static final int STORAGES_PER_DATANODE = 1;
+  private static final int BLOCK_SIZE = 1024 * 1024;
+  private static final int SMALL_BLOCK_SIZE = 1024;
+
+  protected MiniDFSCluster cluster;
+  private Configuration conf;
+  private DistributedFileSystem fs = null;
+  private DFSClient client = null;
+  FsVolumeReference singletonVolumeRef = null;
+  FsVolumeImpl singletonVolume = null;
+
+  private static Random rand = new Random();
+
+  private void initConfig(int blockSize) {
+    conf = new HdfsConfiguration();
+
+    // Refresh disk usage information frequently.
+    conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC);
+    conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
+
+    // Disable the scanner
+    conf.setInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+  }
+
+  static {
+    ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  /**
+   *
+   * @param blockSize
+   * @param perVolumeCapacity limit the capacity of each volume to the given
+   *                          value. If negative, then don't limit.
+   * @throws IOException
+   */
+  private void startCluster(int blockSize, int numDatanodes, long 
perVolumeCapacity) throws IOException {
+    initConfig(blockSize);
+
+    cluster = new MiniDFSCluster
+        .Builder(conf)
+        .storagesPerDatanode(STORAGES_PER_DATANODE)
+        .numDataNodes(numDatanodes)
+        .build();
+    fs = cluster.getFileSystem();
+    client = fs.getClient();
+    cluster.waitActive();
+
+    if (perVolumeCapacity >= 0) {
+      try (FsDatasetSpi.FsVolumeReferences volumes =
+          
cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) {
+        singletonVolumeRef = volumes.get(0).obtainReference();
+      }
+      singletonVolume = ((FsVolumeImpl) singletonVolumeRef.getVolume());
+      singletonVolume.setCapacityForTesting(perVolumeCapacity);
+    }
+  }
+
+  @After
+  public void shutdownCluster() throws IOException {
+    if (singletonVolumeRef != null) {
+      singletonVolumeRef.close();
+      singletonVolumeRef = null;
+    }
+
+    if (client != null) {
+      client.close();
+      client = null;
+    }
+
+    if (fs != null) {
+      fs.close();
+      fs = null;
+    }
+
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  private void createFileAndTestSpaceReservation(
+      final String fileNamePrefix, final int fileBlockSize)
+      throws IOException, InterruptedException {
+    // Enough for 1 block + meta files + some delta.
+    final long configuredCapacity = fileBlockSize * 2 - 1;
+    startCluster(BLOCK_SIZE, 1, configuredCapacity);
+    FSDataOutputStream out = null;
+    Path path = new Path("/" + fileNamePrefix + ".dat");
+
+    try {
+      out = fs.create(path, false, 4096, (short) 1, fileBlockSize);
+
+      byte[] buffer = new byte[rand.nextInt(fileBlockSize / 4)];
+      out.write(buffer);
+      out.hsync();
+      int bytesWritten = buffer.length;
+
+      // Check that space was reserved for a full block minus the bytesWritten.
+      assertThat(singletonVolume.getReservedForReplicas(),
+                 is((long) fileBlockSize - bytesWritten));
+      out.close();
+      out = null;
+
+      // Check that the reserved space has been released since we closed the
+      // file.
+      assertThat(singletonVolume.getReservedForReplicas(), is(0L));
+
+      // Reopen the file for appends and write 1 more byte.
+      out = fs.append(path);
+      out.write(buffer);
+      out.hsync();
+      bytesWritten += buffer.length;
+
+      // Check that space was again reserved for a full block minus the
+      // bytesWritten so far.
+      assertThat(singletonVolume.getReservedForReplicas(),
+                 is((long) fileBlockSize - bytesWritten));
+
+      // Write once again and again verify the available space. This ensures
+      // that the reserved space is progressively adjusted to account for bytes
+      // written to disk.
+      out.write(buffer);
+      out.hsync();
+      bytesWritten += buffer.length;
+      assertThat(singletonVolume.getReservedForReplicas(),
+                 is((long) fileBlockSize - bytesWritten));
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
+  @Test (timeout=300000)
+  public void testWithDefaultBlockSize()
+      throws IOException, InterruptedException {
+    createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), 
BLOCK_SIZE);
+  }
+
+  @Test (timeout=300000)
+  public void testWithNonDefaultBlockSize()
+      throws IOException, InterruptedException {
+    // Same test as previous one, but with a non-default block size.
+    createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), 
BLOCK_SIZE * 2);
+  }
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test (timeout=300000)
+  public void testWithLimitedSpace() throws IOException {
+    // Cluster with just enough space for a full block + meta.
+    startCluster(BLOCK_SIZE, 1, 2 * BLOCK_SIZE - 1);
+    final String methodName = GenericTestUtils.getMethodName();
+    Path file1 = new Path("/" + methodName + ".01.dat");
+    Path file2 = new Path("/" + methodName + ".02.dat");
+
+    // Create two files.
+    FSDataOutputStream os1 = null, os2 = null;
+
+    try {
+      os1 = fs.create(file1);
+      os2 = fs.create(file2);
+
+      // Write one byte to the first file.
+      byte[] data = new byte[1];
+      os1.write(data);
+      os1.hsync();
+
+      // Try to write one byte to the second file.
+      // The block allocation must fail.
+      thrown.expect(RemoteException.class);
+      os2.write(data);
+      os2.hsync();
+    } finally {
+      if (os1 != null) {
+        os1.close();
+      }
+
+      // os2.close() will fail as no block was allocated.
+    }
+  }
+
+  /**
+   * Ensure that reserved space is released when the client goes away
+   * unexpectedly.
+   *
+   * The verification is done for each replica in the write pipeline.
+   *
+   * @throws IOException
+   */
+  @Test(timeout=300000)
+  public void testSpaceReleasedOnUnexpectedEof()
+      throws IOException, InterruptedException, TimeoutException {
+    final short replication = 3;
+    startCluster(BLOCK_SIZE, replication, -1);
+
+    final String methodName = GenericTestUtils.getMethodName();
+    final Path file = new Path("/" + methodName + ".01.dat");
+
+    // Write 1 byte to the file and kill the writer.
+    FSDataOutputStream os = fs.create(file, replication);
+    os.write(new byte[1]);
+    os.hsync();
+    DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());
+
+    // Ensure all space reserved for the replica was released on each
+    // DataNode.
+    for (DataNode dn : cluster.getDataNodes()) {
+      try (FsDatasetSpi.FsVolumeReferences volumes =
+          dn.getFSDataset().getFsVolumeReferences()) {
+        final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
+        GenericTestUtils.waitFor(new Supplier<Boolean>() {
+          @Override
+          public Boolean get() {
+            return (volume.getReservedForReplicas() == 0);
+          }
+        }, 500, Integer.MAX_VALUE); // Wait until the test times out.
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 30000)
+  public void testRBWFileCreationError() throws Exception {
+
+    final short replication = 1;
+    startCluster(BLOCK_SIZE, replication, -1);
+
+    final FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
+        .get(0).getFSDataset().getFsVolumeReferences().get(0);
+    final String methodName = GenericTestUtils.getMethodName();
+    final Path file = new Path("/" + methodName + ".01.dat");
+
+    // Mock BlockPoolSlice so that RBW file creation gives IOExcception
+    BlockPoolSlice blockPoolSlice = Mockito.mock(BlockPoolSlice.class);
+    Mockito.when(blockPoolSlice.createRbwFile((Block) Mockito.any()))
+        .thenThrow(new IOException("Synthetic IO Exception Throgh MOCK"));
+
+    Field field = FsVolumeImpl.class.getDeclaredField("bpSlices");
+    field.setAccessible(true);
+    Map<String, BlockPoolSlice> bpSlices = (Map<String, BlockPoolSlice>) field
+        .get(fsVolumeImpl);
+    bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice);
+
+    try {
+      // Write 1 byte to the file
+      FSDataOutputStream os = fs.create(file, replication);
+      os.write(new byte[1]);
+      os.hsync();
+      os.close();
+      fail("Expecting IOException file creation failure");
+    } catch (IOException e) {
+      // Exception can be ignored (expected)
+    }
+
+    // Ensure RBW space reserved is released
+    assertTrue(
+        "Expected ZERO but got " + fsVolumeImpl.getReservedForReplicas(),
+        fsVolumeImpl.getReservedForReplicas() == 0);
+
+    // Reserve some bytes to verify double clearing space should't happen
+    fsVolumeImpl.reserveSpaceForReplica(1000);
+    try {
+      // Write 1 byte to the file
+      FSDataOutputStream os = fs.create(new Path("/" + methodName + ".02.dat"),
+          replication);
+      os.write(new byte[1]);
+      os.hsync();
+      os.close();
+      fail("Expecting IOException file creation failure");
+    } catch (IOException e) {
+      // Exception can be ignored (expected)
+    }
+
+    // Ensure RBW space reserved is released only once
+    assertTrue(fsVolumeImpl.getReservedForReplicas() == 1000);
+  }
+
+  @Test(timeout = 30000)
+  public void testReservedSpaceInJMXBean() throws Exception {
+
+    final short replication = 1;
+    startCluster(BLOCK_SIZE, replication, -1);
+
+    final String methodName = GenericTestUtils.getMethodName();
+    final Path file = new Path("/" + methodName + ".01.dat");
+
+    try (FSDataOutputStream os = fs.create(file, replication)) {
+      // Write 1 byte to the file
+      os.write(new byte[1]);
+      os.hsync();
+
+      final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      final ObjectName mxbeanName = new ObjectName(
+          "Hadoop:service=DataNode,name=DataNodeInfo");
+      final String volumeInfo = (String) mbs.getAttribute(mxbeanName,
+          "VolumeInfo");
+
+      // verify reserved space for Replicas in JMX bean volume info
+      assertTrue(volumeInfo.contains("reservedSpaceForReplicas"));
+    }
+  }
+
+  @Test(timeout = 300000)
+  public void testTmpSpaceReserve() throws Exception {
+
+    final short replication = 2;
+    startCluster(BLOCK_SIZE, replication, -1);
+    final int byteCount1 = 100;
+    final int byteCount2 = 200;
+
+    final String methodName = GenericTestUtils.getMethodName();
+
+    // Test positive scenario
+    {
+      final Path file = new Path("/" + methodName + ".01.dat");
+
+      try (FSDataOutputStream os = fs.create(file, (short) 1)) {
+        // Write test data to the file
+        os.write(new byte[byteCount1]);
+        os.hsync();
+      }
+
+      BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, 10);
+      String firstReplicaNode = blockLocations[0].getNames()[0];
+
+      int newReplicaDNIndex = 0;
+      if (firstReplicaNode.equals(cluster.getDataNodes().get(0)
+          .getDisplayName())) {
+        newReplicaDNIndex = 1;
+      }
+
+      FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
+          
.get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0);
+
+      performReReplication(file, true);
+
+      assertEquals("Wrong reserve space for Tmp ", byteCount1,
+          fsVolumeImpl.getRecentReserved());
+
+      assertEquals("Reserved Tmp space is not released", 0,
+          fsVolumeImpl.getReservedForReplicas());
+    }
+
+    // Test when file creation fails
+    {
+      final Path file = new Path("/" + methodName + ".01.dat");
+
+      try (FSDataOutputStream os = fs.create(file, (short) 1)) {
+        // Write test data to the file
+        os.write(new byte[byteCount2]);
+        os.hsync();
+      }
+
+      BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, 10);
+      String firstReplicaNode = blockLocations[0].getNames()[0];
+
+      int newReplicaDNIndex = 0;
+      if (firstReplicaNode.equals(cluster.getDataNodes().get(0)
+          .getDisplayName())) {
+        newReplicaDNIndex = 1;
+      }
+
+      BlockPoolSlice blockPoolSlice = Mockito.mock(BlockPoolSlice.class);
+      Mockito.when(blockPoolSlice.createTmpFile((Block) Mockito.any()))
+          .thenThrow(new IOException("Synthetic IO Exception Throgh MOCK"));
+
+      final FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
+          
.get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0);
+
+      // Reserve some bytes to verify double clearing space should't happen
+      fsVolumeImpl.reserveSpaceForReplica(1000);
+
+      Field field = FsVolumeImpl.class.getDeclaredField("bpSlices");
+      field.setAccessible(true);
+      @SuppressWarnings("unchecked")
+      Map<String, BlockPoolSlice> bpSlices = (Map<String, BlockPoolSlice>) 
field
+          .get(fsVolumeImpl);
+      bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice);
+
+      performReReplication(file, false);
+
+      assertEquals("Wrong reserve space for Tmp ", byteCount2,
+          fsVolumeImpl.getRecentReserved());
+
+      assertEquals("Tmp space is not released OR released twice", 1000,
+          fsVolumeImpl.getReservedForReplicas());
+    }
+  }
+
+  private void performReReplication(Path filePath, boolean waitForSuccess)
+      throws Exception {
+    fs.setReplication(filePath, (short) 2);
+
+    Thread.sleep(4000);
+    BlockLocation[] blockLocations = fs.getFileBlockLocations(filePath, 0, 10);
+
+    if (waitForSuccess) {
+      // Wait for the re replication
+      while (blockLocations[0].getNames().length < 2) {
+        Thread.sleep(2000);
+        blockLocations = fs.getFileBlockLocations(filePath, 0, 10);
+      }
+    }
+  }
+
+  /**
+   * Stress test to ensure we are not leaking reserved space.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=600000)
+  public void stressTest() throws IOException, InterruptedException {
+    final int numWriters = 5;
+    startCluster(SMALL_BLOCK_SIZE, 1, SMALL_BLOCK_SIZE * numWriters * 10);
+    Writer[] writers = new Writer[numWriters];
+
+    // Start a few writers and let them run for a while.
+    for (int i = 0; i < numWriters; ++i) {
+      writers[i] = new Writer(client, SMALL_BLOCK_SIZE);
+      writers[i].start();
+    }
+
+    Thread.sleep(60000);
+
+    // Stop the writers.
+    for (Writer w : writers) {
+      w.stopWriter();
+    }
+    int filesCreated = 0;
+    int numFailures = 0;
+    for (Writer w : writers) {
+      w.join();
+      filesCreated += w.getFilesCreated();
+      numFailures += w.getNumFailures();
+    }
+
+    LOG.info("Stress test created " + filesCreated +
+             " files and hit " + numFailures + " failures");
+
+    // Check no space was leaked.
+    assertThat(singletonVolume.getReservedForReplicas(), is(0L));
+  }
+
+  private static class Writer extends Daemon {
+    private volatile boolean keepRunning;
+    private final DFSClient localClient;
+    private int filesCreated = 0;
+    private int numFailures = 0;
+    byte[] data;
+
+    Writer(DFSClient client, int blockSize) throws IOException {
+      localClient = client;
+      keepRunning = true;
+      filesCreated = 0;
+      numFailures = 0;
+
+      // At least some of the files should span a block boundary.
+      data = new byte[blockSize * 2];
+    }
+
+    @Override
+    public void run() {
+      /**
+       * Create a file, write up to 3 blocks of data and close the file.
+       * Do this in a loop until we are told to stop.
+       */
+      while (keepRunning) {
+        OutputStream os = null;
+        try {
+          String filename = "/file-" + rand.nextLong();
+          os = localClient.create(filename, false);
+          os.write(data, 0, rand.nextInt(data.length));
+          IOUtils.closeQuietly(os);
+          os = null;
+          localClient.delete(filename, false);
+          Thread.sleep(50);     // Sleep for a bit to avoid killing the system.
+          ++filesCreated;
+        } catch (IOException ioe) {
+          // Just ignore the exception and keep going.
+          ++numFailures;
+        } catch (InterruptedException ie) {
+          return;
+        } finally {
+          if (os != null) {
+            IOUtils.closeQuietly(os);
+          }
+        }
+      }
+    }
+
+    public void stopWriter() {
+      keepRunning = false;
+    }
+
+    public int getFilesCreated() {
+      return filesCreated;
+    }
+
+    public int getNumFailures() {
+      return numFailures;
+    }
+  }
+}

Reply via email to