HDFS-6955. DN should reserve disk space for a full block when creating tmp files (Contributed by Kanaka Kumar Avvaru)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/92c1af16 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/92c1af16 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/92c1af16 Branch: refs/heads/HDFS-7966 Commit: 92c1af1646b1d91a2ab7821e4f7d450e3b6e10bb Parents: a7201d6 Author: Vinayakumar B <vinayakum...@apache.org> Authored: Fri Sep 18 16:37:10 2015 +0530 Committer: Vinayakumar B <vinayakum...@apache.org> Committed: Fri Sep 18 16:37:10 2015 +0530 ---------------------------------------------------------------------- .../hdfs/server/datanode/BlockReceiver.java | 5 +- .../server/datanode/fsdataset/FsVolumeSpi.java | 8 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 13 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 72 ++- .../server/datanode/SimulatedFSDataset.java | 2 +- .../server/datanode/TestDirectoryScanner.java | 2 +- .../datanode/extdataset/ExternalVolumeImpl.java | 2 +- .../fsdataset/impl/TestRbwSpaceReservation.java | 452 --------------- .../fsdataset/impl/TestSpaceReservation.java | 576 +++++++++++++++++++ 9 files changed, 637 insertions(+), 495 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index bc5396f..957b2c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -117,7 +117,7 @@ class BlockReceiver implements Closeable { /** the block to receive */ private final ExtendedBlock block; /** the replica to write */ - private final ReplicaInPipelineInterface replicaInfo; + private ReplicaInPipelineInterface replicaInfo; /** pipeline stage */ private final BlockConstructionStage stage; private final boolean isTransfer; @@ -259,6 +259,9 @@ class BlockReceiver implements Closeable { } catch (ReplicaNotFoundException bne) { throw bne; } catch(IOException ioe) { + if (replicaInfo != null) { + replicaInfo.releaseAllBytesReserved(); + } IOUtils.closeStream(this); cleanupBlock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index ee01924..9e16121 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -62,13 +62,13 @@ public interface FsVolumeSpi { boolean isTransientStorage(); /** - * Reserve disk space for an RBW block so a writer does not run out of - * space before the block is full. + * Reserve disk space for a block (RBW or Re-replicating) + * so a writer does not run out of space before the block is full. */ - void reserveSpaceForRbw(long bytesToReserve); + void reserveSpaceForReplica(long bytesToReserve); /** - * Release disk space previously reserved for RBW block. + * Release disk space previously reserved for block opened for write. */ void releaseReservedSpace(long bytesToRelease); http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 8722d35..32eb724 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1157,7 +1157,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // Replace finalized replica by a RBW replica in replicas map volumeMap.add(bpid, newReplicaInfo); - v.reserveSpaceForRbw(estimateBlockLen - replicaInfo.getNumBytes()); + v.reserveSpaceForReplica(estimateBlockLen - replicaInfo.getNumBytes()); return newReplicaInfo; } @@ -1487,7 +1487,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, - f.getParentFile(), 0); + f.getParentFile(), b.getLocalBlock().getNumBytes()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return new ReplicaHandler(newReplicaInfo, ref); } else { @@ -1604,7 +1604,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) { // remove from volumeMap volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock()); - + // delete the on-disk temp file if (delBlockFromDisk(replicaInfo.getBlockFile(), replicaInfo.getMetaFile(), b.getLocalBlock())) { @@ -2555,14 +2555,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { final long usedSpace; // size of space used by HDFS final long freeSpace; // size of free space excluding reserved space final long reservedSpace; // size of space reserved for non-HDFS - final long reservedSpaceForRBW; // size of space reserved RBW + final long reservedSpaceForReplicas; // size of space reserved RBW or + // re-replication VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace) { this.directory = v.toString(); this.usedSpace = usedSpace; this.freeSpace = freeSpace; this.reservedSpace = v.getReserved(); - this.reservedSpaceForRBW = v.getReservedForRbw(); + this.reservedSpaceForReplicas = v.getReservedForReplicas(); } } @@ -2596,7 +2597,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { innerInfo.put("usedSpace", v.usedSpace); innerInfo.put("freeSpace", v.freeSpace); innerInfo.put("reservedSpace", v.reservedSpace); - innerInfo.put("reservedSpaceForRBW", v.reservedSpaceForRBW); + innerInfo.put("reservedSpaceForReplicas", v.reservedSpaceForReplicas); info.put(v.directory, innerInfo); } return info; http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index e90f5d2..8fd52c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -22,8 +22,8 @@ import java.io.File; import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; -import java.nio.channels.ClosedChannelException; import java.io.OutputStreamWriter; +import java.nio.channels.ClosedChannelException; import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; @@ -40,9 +40,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DF; @@ -54,21 +51,24 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; -import org.apache.hadoop.util.CloseableReferenceCount; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.CloseableReferenceCount; import org.apache.hadoop.util.DiskChecker.DiskErrorException; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.util.Time; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * The underlying volume used to store replica. * @@ -90,8 +90,9 @@ public class FsVolumeImpl implements FsVolumeSpi { private final long reserved; private CloseableReferenceCount reference = new CloseableReferenceCount(); - // Disk space reserved for open blocks. - private AtomicLong reservedForRbw; + // Disk space reserved for blocks (RBW or Re-replicating) open for write. + private AtomicLong reservedForReplicas; + private long recentReserved = 0; // Capacity configured. This is useful when we want to // limit the visible capacity for tests. If negative, then we just @@ -113,8 +114,8 @@ public class FsVolumeImpl implements FsVolumeSpi { this.reserved = conf.getLong( DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT); - this.reservedForRbw = new AtomicLong(0L); - this.currentDir = currentDir; + this.reservedForReplicas = new AtomicLong(0L); + this.currentDir = currentDir; File parent = currentDir.getParentFile(); this.usage = new DF(parent, conf); this.storageType = storageType; @@ -353,8 +354,9 @@ public class FsVolumeImpl implements FsVolumeSpi { */ @Override public long getAvailable() throws IOException { - long remaining = getCapacity() - getDfsUsed() - reservedForRbw.get(); - long available = usage.getAvailable() - reserved - reservedForRbw.get(); + long remaining = getCapacity() - getDfsUsed() - reservedForReplicas.get(); + long available = usage.getAvailable() - reserved + - reservedForReplicas.get(); if (remaining > available) { remaining = available; } @@ -362,10 +364,15 @@ public class FsVolumeImpl implements FsVolumeSpi { } @VisibleForTesting - public long getReservedForRbw() { - return reservedForRbw.get(); + public long getReservedForReplicas() { + return reservedForReplicas.get(); } - + + @VisibleForTesting + long getRecentReserved() { + return recentReserved; + } + long getReserved(){ return reserved; } @@ -412,13 +419,20 @@ public class FsVolumeImpl implements FsVolumeSpi { */ File createTmpFile(String bpid, Block b) throws IOException { checkReference(); - return getBlockPoolSlice(bpid).createTmpFile(b); + reserveSpaceForReplica(b.getNumBytes()); + try { + return getBlockPoolSlice(bpid).createTmpFile(b); + } catch (IOException exception) { + releaseReservedSpace(b.getNumBytes()); + throw exception; + } } @Override - public void reserveSpaceForRbw(long bytesToReserve) { + public void reserveSpaceForReplica(long bytesToReserve) { if (bytesToReserve != 0) { - reservedForRbw.addAndGet(bytesToReserve); + reservedForReplicas.addAndGet(bytesToReserve); + recentReserved = bytesToReserve; } } @@ -428,14 +442,15 @@ public class FsVolumeImpl implements FsVolumeSpi { long oldReservation, newReservation; do { - oldReservation = reservedForRbw.get(); + oldReservation = reservedForReplicas.get(); newReservation = oldReservation - bytesToRelease; if (newReservation < 0) { - // Failsafe, this should never occur in practice, but if it does we don't - // want to start advertising more space than we have available. + // Failsafe, this should never occur in practice, but if it does we + // don't want to start advertising more space than we have available. newReservation = 0; } - } while (!reservedForRbw.compareAndSet(oldReservation, newReservation)); + } while (!reservedForReplicas.compareAndSet(oldReservation, + newReservation)); } } @@ -779,7 +794,7 @@ public class FsVolumeImpl implements FsVolumeSpi { */ File createRbwFile(String bpid, Block b) throws IOException { checkReference(); - reserveSpaceForRbw(b.getNumBytes()); + reserveSpaceForReplica(b.getNumBytes()); try { return getBlockPoolSlice(bpid).createRbwFile(b); } catch (IOException exception) { @@ -790,16 +805,15 @@ public class FsVolumeImpl implements FsVolumeSpi { /** * - * @param bytesReservedForRbw Space that was reserved during + * @param bytesReserved Space that was reserved during * block creation. Now that the block is being finalized we * can free up this space. * @return * @throws IOException */ - File addFinalizedBlock(String bpid, Block b, - File f, long bytesReservedForRbw) + File addFinalizedBlock(String bpid, Block b, File f, long bytesReserved) throws IOException { - releaseReservedSpace(bytesReservedForRbw); + releaseReservedSpace(bytesReserved); return getBlockPoolSlice(bpid).addFinalizedBlock(b, f); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 5d1b31a..acbd8a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -492,7 +492,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { } @Override - public void reserveSpaceForRbw(long bytesToReserve) { + public void reserveSpaceForReplica(long bytesToReserve) { } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 9b942b7..baf50d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -612,7 +612,7 @@ public class TestDirectoryScanner { } @Override - public void reserveSpaceForRbw(long bytesToReserve) { + public void reserveSpaceForReplica(long bytesToReserve) { } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java index 3242ff7..985a259 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java @@ -74,7 +74,7 @@ public class ExternalVolumeImpl implements FsVolumeSpi { } @Override - public void reserveSpaceForRbw(long bytesToReserve) { + public void reserveSpaceForReplica(long bytesToReserve) { } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java deleted file mode 100644 index a647d96..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java +++ /dev/null @@ -1,452 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; - -import com.google.common.base.Supplier; -import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; -import org.apache.hadoop.conf.Configuration; -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.*; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.Daemon; -import org.apache.log4j.Level; -import org.junit.After; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.mockito.Mockito; - -import java.io.IOException; -import java.io.OutputStream; -import java.lang.management.ManagementFactory; -import java.lang.reflect.Field; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.TimeoutException; - -import javax.management.MBeanServer; -import javax.management.ObjectName; - -/** - * Ensure that the DN reserves disk space equivalent to a full block for - * replica being written (RBW). - */ -public class TestRbwSpaceReservation { - static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class); - - private static final int DU_REFRESH_INTERVAL_MSEC = 500; - private static final int STORAGES_PER_DATANODE = 1; - private static final int BLOCK_SIZE = 1024 * 1024; - private static final int SMALL_BLOCK_SIZE = 1024; - - protected MiniDFSCluster cluster; - private Configuration conf; - private DistributedFileSystem fs = null; - private DFSClient client = null; - FsVolumeReference singletonVolumeRef = null; - FsVolumeImpl singletonVolume = null; - - private static Random rand = new Random(); - - private void initConfig(int blockSize) { - conf = new HdfsConfiguration(); - - // Refresh disk usage information frequently. - conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC); - conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize); - - // Disable the scanner - conf.setInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); - } - - static { - ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL); - ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL); - } - - /** - * - * @param blockSize - * @param perVolumeCapacity limit the capacity of each volume to the given - * value. If negative, then don't limit. - * @throws IOException - */ - private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacity) throws IOException { - initConfig(blockSize); - - cluster = new MiniDFSCluster - .Builder(conf) - .storagesPerDatanode(STORAGES_PER_DATANODE) - .numDataNodes(numDatanodes) - .build(); - fs = cluster.getFileSystem(); - client = fs.getClient(); - cluster.waitActive(); - - if (perVolumeCapacity >= 0) { - try (FsDatasetSpi.FsVolumeReferences volumes = - cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) { - singletonVolumeRef = volumes.get(0).obtainReference(); - } - singletonVolume = ((FsVolumeImpl) singletonVolumeRef.getVolume()); - singletonVolume.setCapacityForTesting(perVolumeCapacity); - } - } - - @After - public void shutdownCluster() throws IOException { - if (singletonVolumeRef != null) { - singletonVolumeRef.close(); - singletonVolumeRef = null; - } - - if (client != null) { - client.close(); - client = null; - } - - if (fs != null) { - fs.close(); - fs = null; - } - - if (cluster != null) { - cluster.shutdown(); - cluster = null; - } - } - - private void createFileAndTestSpaceReservation( - final String fileNamePrefix, final int fileBlockSize) - throws IOException, InterruptedException { - // Enough for 1 block + meta files + some delta. - final long configuredCapacity = fileBlockSize * 2 - 1; - startCluster(BLOCK_SIZE, 1, configuredCapacity); - FSDataOutputStream out = null; - Path path = new Path("/" + fileNamePrefix + ".dat"); - - try { - out = fs.create(path, false, 4096, (short) 1, fileBlockSize); - - byte[] buffer = new byte[rand.nextInt(fileBlockSize / 4)]; - out.write(buffer); - out.hsync(); - int bytesWritten = buffer.length; - - // Check that space was reserved for a full block minus the bytesWritten. - assertThat(singletonVolume.getReservedForRbw(), - is((long) fileBlockSize - bytesWritten)); - out.close(); - out = null; - - // Check that the reserved space has been released since we closed the - // file. - assertThat(singletonVolume.getReservedForRbw(), is(0L)); - - // Reopen the file for appends and write 1 more byte. - out = fs.append(path); - out.write(buffer); - out.hsync(); - bytesWritten += buffer.length; - - // Check that space was again reserved for a full block minus the - // bytesWritten so far. - assertThat(singletonVolume.getReservedForRbw(), - is((long) fileBlockSize - bytesWritten)); - - // Write once again and again verify the available space. This ensures - // that the reserved space is progressively adjusted to account for bytes - // written to disk. - out.write(buffer); - out.hsync(); - bytesWritten += buffer.length; - assertThat(singletonVolume.getReservedForRbw(), - is((long) fileBlockSize - bytesWritten)); - } finally { - if (out != null) { - out.close(); - } - } - } - - @Test (timeout=300000) - public void testWithDefaultBlockSize() - throws IOException, InterruptedException { - createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE); - } - - @Test (timeout=300000) - public void testWithNonDefaultBlockSize() - throws IOException, InterruptedException { - // Same test as previous one, but with a non-default block size. - createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE * 2); - } - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Test (timeout=300000) - public void testWithLimitedSpace() throws IOException { - // Cluster with just enough space for a full block + meta. - startCluster(BLOCK_SIZE, 1, 2 * BLOCK_SIZE - 1); - final String methodName = GenericTestUtils.getMethodName(); - Path file1 = new Path("/" + methodName + ".01.dat"); - Path file2 = new Path("/" + methodName + ".02.dat"); - - // Create two files. - FSDataOutputStream os1 = null, os2 = null; - - try { - os1 = fs.create(file1); - os2 = fs.create(file2); - - // Write one byte to the first file. - byte[] data = new byte[1]; - os1.write(data); - os1.hsync(); - - // Try to write one byte to the second file. - // The block allocation must fail. - thrown.expect(RemoteException.class); - os2.write(data); - os2.hsync(); - } finally { - if (os1 != null) { - os1.close(); - } - - // os2.close() will fail as no block was allocated. - } - } - - /** - * Ensure that reserved space is released when the client goes away - * unexpectedly. - * - * The verification is done for each replica in the write pipeline. - * - * @throws IOException - */ - @Test(timeout=300000) - public void testSpaceReleasedOnUnexpectedEof() - throws IOException, InterruptedException, TimeoutException { - final short replication = 3; - startCluster(BLOCK_SIZE, replication, -1); - - final String methodName = GenericTestUtils.getMethodName(); - final Path file = new Path("/" + methodName + ".01.dat"); - - // Write 1 byte to the file and kill the writer. - FSDataOutputStream os = fs.create(file, replication); - os.write(new byte[1]); - os.hsync(); - DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream()); - - // Ensure all space reserved for the replica was released on each - // DataNode. - for (DataNode dn : cluster.getDataNodes()) { - try (FsDatasetSpi.FsVolumeReferences volumes = - dn.getFSDataset().getFsVolumeReferences()) { - final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0); - GenericTestUtils.waitFor(new Supplier<Boolean>() { - @Override - public Boolean get() { - return (volume.getReservedForRbw() == 0); - } - }, 500, Integer.MAX_VALUE); // Wait until the test times out. - } - } - } - - @SuppressWarnings("unchecked") - @Test(timeout = 30000) - public void testRBWFileCreationError() throws Exception { - - final short replication = 1; - startCluster(BLOCK_SIZE, replication, -1); - - final FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes() - .get(0).getFSDataset().getFsVolumeReferences().get(0); - final String methodName = GenericTestUtils.getMethodName(); - final Path file = new Path("/" + methodName + ".01.dat"); - - // Mock BlockPoolSlice so that RBW file creation gives IOExcception - BlockPoolSlice blockPoolSlice = Mockito.mock(BlockPoolSlice.class); - Mockito.when(blockPoolSlice.createRbwFile((Block) Mockito.any())) - .thenThrow(new IOException("Synthetic IO Exception Throgh MOCK")); - - Field field = FsVolumeImpl.class.getDeclaredField("bpSlices"); - field.setAccessible(true); - Map<String, BlockPoolSlice> bpSlices = (Map<String, BlockPoolSlice>) field - .get(fsVolumeImpl); - bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice); - - try { - // Write 1 byte to the file - FSDataOutputStream os = fs.create(file, replication); - os.write(new byte[1]); - os.hsync(); - os.close(); - fail("Expecting IOException file creation failure"); - } catch (IOException e) { - // Exception can be ignored (expected) - } - - // Ensure RBW space reserved is released - assertTrue("Expected ZERO but got " + fsVolumeImpl.getReservedForRbw(), - fsVolumeImpl.getReservedForRbw() == 0); - } - - @Test(timeout = 30000) - public void testRBWInJMXBean() throws Exception { - - final short replication = 1; - startCluster(BLOCK_SIZE, replication, -1); - - final String methodName = GenericTestUtils.getMethodName(); - final Path file = new Path("/" + methodName + ".01.dat"); - - try (FSDataOutputStream os = fs.create(file, replication)) { - // Write 1 byte to the file - os.write(new byte[1]); - os.hsync(); - - final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - final ObjectName mxbeanName = new ObjectName( - "Hadoop:service=DataNode,name=DataNodeInfo"); - final String volumeInfo = (String) mbs.getAttribute(mxbeanName, - "VolumeInfo"); - - assertTrue(volumeInfo.contains("reservedSpaceForRBW")); - } - } - - /** - * Stress test to ensure we are not leaking reserved space. - * @throws IOException - * @throws InterruptedException - */ - @Test (timeout=600000) - public void stressTest() throws IOException, InterruptedException { - final int numWriters = 5; - startCluster(SMALL_BLOCK_SIZE, 1, SMALL_BLOCK_SIZE * numWriters * 10); - Writer[] writers = new Writer[numWriters]; - - // Start a few writers and let them run for a while. - for (int i = 0; i < numWriters; ++i) { - writers[i] = new Writer(client, SMALL_BLOCK_SIZE); - writers[i].start(); - } - - Thread.sleep(60000); - - // Stop the writers. - for (Writer w : writers) { - w.stopWriter(); - } - int filesCreated = 0; - int numFailures = 0; - for (Writer w : writers) { - w.join(); - filesCreated += w.getFilesCreated(); - numFailures += w.getNumFailures(); - } - - LOG.info("Stress test created " + filesCreated + - " files and hit " + numFailures + " failures"); - - // Check no space was leaked. - assertThat(singletonVolume.getReservedForRbw(), is(0L)); - } - - private static class Writer extends Daemon { - private volatile boolean keepRunning; - private final DFSClient localClient; - private int filesCreated = 0; - private int numFailures = 0; - byte[] data; - - Writer(DFSClient client, int blockSize) throws IOException { - localClient = client; - keepRunning = true; - filesCreated = 0; - numFailures = 0; - - // At least some of the files should span a block boundary. - data = new byte[blockSize * 2]; - } - - @Override - public void run() { - /** - * Create a file, write up to 3 blocks of data and close the file. - * Do this in a loop until we are told to stop. - */ - while (keepRunning) { - OutputStream os = null; - try { - String filename = "/file-" + rand.nextLong(); - os = localClient.create(filename, false); - os.write(data, 0, rand.nextInt(data.length)); - IOUtils.closeQuietly(os); - os = null; - localClient.delete(filename, false); - Thread.sleep(50); // Sleep for a bit to avoid killing the system. - ++filesCreated; - } catch (IOException ioe) { - // Just ignore the exception and keep going. - ++numFailures; - } catch (InterruptedException ie) { - return; - } finally { - if (os != null) { - IOUtils.closeQuietly(os); - } - } - } - } - - public void stopWriter() { - keepRunning = false; - } - - public int getFilesCreated() { - return filesCreated; - } - - public int getNumFailures() { - return numFailures; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java new file mode 100644 index 0000000..c494288 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java @@ -0,0 +1,576 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import com.google.common.base.Supplier; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Daemon; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mockito; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.management.ManagementFactory; +import java.lang.reflect.Field; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeoutException; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +/** + * Ensure that the DN reserves disk space equivalent to a full block for + * replica being written (RBW) & Replica being copied from another DN. + */ +public class TestSpaceReservation { + static final Log LOG = LogFactory.getLog(TestSpaceReservation.class); + + private static final int DU_REFRESH_INTERVAL_MSEC = 500; + private static final int STORAGES_PER_DATANODE = 1; + private static final int BLOCK_SIZE = 1024 * 1024; + private static final int SMALL_BLOCK_SIZE = 1024; + + protected MiniDFSCluster cluster; + private Configuration conf; + private DistributedFileSystem fs = null; + private DFSClient client = null; + FsVolumeReference singletonVolumeRef = null; + FsVolumeImpl singletonVolume = null; + + private static Random rand = new Random(); + + private void initConfig(int blockSize) { + conf = new HdfsConfiguration(); + + // Refresh disk usage information frequently. + conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC); + conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize); + + // Disable the scanner + conf.setInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); + } + + static { + ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL); + } + + /** + * + * @param blockSize + * @param perVolumeCapacity limit the capacity of each volume to the given + * value. If negative, then don't limit. + * @throws IOException + */ + private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacity) throws IOException { + initConfig(blockSize); + + cluster = new MiniDFSCluster + .Builder(conf) + .storagesPerDatanode(STORAGES_PER_DATANODE) + .numDataNodes(numDatanodes) + .build(); + fs = cluster.getFileSystem(); + client = fs.getClient(); + cluster.waitActive(); + + if (perVolumeCapacity >= 0) { + try (FsDatasetSpi.FsVolumeReferences volumes = + cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) { + singletonVolumeRef = volumes.get(0).obtainReference(); + } + singletonVolume = ((FsVolumeImpl) singletonVolumeRef.getVolume()); + singletonVolume.setCapacityForTesting(perVolumeCapacity); + } + } + + @After + public void shutdownCluster() throws IOException { + if (singletonVolumeRef != null) { + singletonVolumeRef.close(); + singletonVolumeRef = null; + } + + if (client != null) { + client.close(); + client = null; + } + + if (fs != null) { + fs.close(); + fs = null; + } + + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + private void createFileAndTestSpaceReservation( + final String fileNamePrefix, final int fileBlockSize) + throws IOException, InterruptedException { + // Enough for 1 block + meta files + some delta. + final long configuredCapacity = fileBlockSize * 2 - 1; + startCluster(BLOCK_SIZE, 1, configuredCapacity); + FSDataOutputStream out = null; + Path path = new Path("/" + fileNamePrefix + ".dat"); + + try { + out = fs.create(path, false, 4096, (short) 1, fileBlockSize); + + byte[] buffer = new byte[rand.nextInt(fileBlockSize / 4)]; + out.write(buffer); + out.hsync(); + int bytesWritten = buffer.length; + + // Check that space was reserved for a full block minus the bytesWritten. + assertThat(singletonVolume.getReservedForReplicas(), + is((long) fileBlockSize - bytesWritten)); + out.close(); + out = null; + + // Check that the reserved space has been released since we closed the + // file. + assertThat(singletonVolume.getReservedForReplicas(), is(0L)); + + // Reopen the file for appends and write 1 more byte. + out = fs.append(path); + out.write(buffer); + out.hsync(); + bytesWritten += buffer.length; + + // Check that space was again reserved for a full block minus the + // bytesWritten so far. + assertThat(singletonVolume.getReservedForReplicas(), + is((long) fileBlockSize - bytesWritten)); + + // Write once again and again verify the available space. This ensures + // that the reserved space is progressively adjusted to account for bytes + // written to disk. + out.write(buffer); + out.hsync(); + bytesWritten += buffer.length; + assertThat(singletonVolume.getReservedForReplicas(), + is((long) fileBlockSize - bytesWritten)); + } finally { + if (out != null) { + out.close(); + } + } + } + + @Test (timeout=300000) + public void testWithDefaultBlockSize() + throws IOException, InterruptedException { + createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE); + } + + @Test (timeout=300000) + public void testWithNonDefaultBlockSize() + throws IOException, InterruptedException { + // Same test as previous one, but with a non-default block size. + createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE * 2); + } + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test (timeout=300000) + public void testWithLimitedSpace() throws IOException { + // Cluster with just enough space for a full block + meta. + startCluster(BLOCK_SIZE, 1, 2 * BLOCK_SIZE - 1); + final String methodName = GenericTestUtils.getMethodName(); + Path file1 = new Path("/" + methodName + ".01.dat"); + Path file2 = new Path("/" + methodName + ".02.dat"); + + // Create two files. + FSDataOutputStream os1 = null, os2 = null; + + try { + os1 = fs.create(file1); + os2 = fs.create(file2); + + // Write one byte to the first file. + byte[] data = new byte[1]; + os1.write(data); + os1.hsync(); + + // Try to write one byte to the second file. + // The block allocation must fail. + thrown.expect(RemoteException.class); + os2.write(data); + os2.hsync(); + } finally { + if (os1 != null) { + os1.close(); + } + + // os2.close() will fail as no block was allocated. + } + } + + /** + * Ensure that reserved space is released when the client goes away + * unexpectedly. + * + * The verification is done for each replica in the write pipeline. + * + * @throws IOException + */ + @Test(timeout=300000) + public void testSpaceReleasedOnUnexpectedEof() + throws IOException, InterruptedException, TimeoutException { + final short replication = 3; + startCluster(BLOCK_SIZE, replication, -1); + + final String methodName = GenericTestUtils.getMethodName(); + final Path file = new Path("/" + methodName + ".01.dat"); + + // Write 1 byte to the file and kill the writer. + FSDataOutputStream os = fs.create(file, replication); + os.write(new byte[1]); + os.hsync(); + DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream()); + + // Ensure all space reserved for the replica was released on each + // DataNode. + for (DataNode dn : cluster.getDataNodes()) { + try (FsDatasetSpi.FsVolumeReferences volumes = + dn.getFSDataset().getFsVolumeReferences()) { + final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0); + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return (volume.getReservedForReplicas() == 0); + } + }, 500, Integer.MAX_VALUE); // Wait until the test times out. + } + } + } + + @SuppressWarnings("unchecked") + @Test(timeout = 30000) + public void testRBWFileCreationError() throws Exception { + + final short replication = 1; + startCluster(BLOCK_SIZE, replication, -1); + + final FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes() + .get(0).getFSDataset().getFsVolumeReferences().get(0); + final String methodName = GenericTestUtils.getMethodName(); + final Path file = new Path("/" + methodName + ".01.dat"); + + // Mock BlockPoolSlice so that RBW file creation gives IOExcception + BlockPoolSlice blockPoolSlice = Mockito.mock(BlockPoolSlice.class); + Mockito.when(blockPoolSlice.createRbwFile((Block) Mockito.any())) + .thenThrow(new IOException("Synthetic IO Exception Throgh MOCK")); + + Field field = FsVolumeImpl.class.getDeclaredField("bpSlices"); + field.setAccessible(true); + Map<String, BlockPoolSlice> bpSlices = (Map<String, BlockPoolSlice>) field + .get(fsVolumeImpl); + bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice); + + try { + // Write 1 byte to the file + FSDataOutputStream os = fs.create(file, replication); + os.write(new byte[1]); + os.hsync(); + os.close(); + fail("Expecting IOException file creation failure"); + } catch (IOException e) { + // Exception can be ignored (expected) + } + + // Ensure RBW space reserved is released + assertTrue( + "Expected ZERO but got " + fsVolumeImpl.getReservedForReplicas(), + fsVolumeImpl.getReservedForReplicas() == 0); + + // Reserve some bytes to verify double clearing space should't happen + fsVolumeImpl.reserveSpaceForReplica(1000); + try { + // Write 1 byte to the file + FSDataOutputStream os = fs.create(new Path("/" + methodName + ".02.dat"), + replication); + os.write(new byte[1]); + os.hsync(); + os.close(); + fail("Expecting IOException file creation failure"); + } catch (IOException e) { + // Exception can be ignored (expected) + } + + // Ensure RBW space reserved is released only once + assertTrue(fsVolumeImpl.getReservedForReplicas() == 1000); + } + + @Test(timeout = 30000) + public void testReservedSpaceInJMXBean() throws Exception { + + final short replication = 1; + startCluster(BLOCK_SIZE, replication, -1); + + final String methodName = GenericTestUtils.getMethodName(); + final Path file = new Path("/" + methodName + ".01.dat"); + + try (FSDataOutputStream os = fs.create(file, replication)) { + // Write 1 byte to the file + os.write(new byte[1]); + os.hsync(); + + final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + final ObjectName mxbeanName = new ObjectName( + "Hadoop:service=DataNode,name=DataNodeInfo"); + final String volumeInfo = (String) mbs.getAttribute(mxbeanName, + "VolumeInfo"); + + // verify reserved space for Replicas in JMX bean volume info + assertTrue(volumeInfo.contains("reservedSpaceForReplicas")); + } + } + + @Test(timeout = 300000) + public void testTmpSpaceReserve() throws Exception { + + final short replication = 2; + startCluster(BLOCK_SIZE, replication, -1); + final int byteCount1 = 100; + final int byteCount2 = 200; + + final String methodName = GenericTestUtils.getMethodName(); + + // Test positive scenario + { + final Path file = new Path("/" + methodName + ".01.dat"); + + try (FSDataOutputStream os = fs.create(file, (short) 1)) { + // Write test data to the file + os.write(new byte[byteCount1]); + os.hsync(); + } + + BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, 10); + String firstReplicaNode = blockLocations[0].getNames()[0]; + + int newReplicaDNIndex = 0; + if (firstReplicaNode.equals(cluster.getDataNodes().get(0) + .getDisplayName())) { + newReplicaDNIndex = 1; + } + + FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes() + .get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0); + + performReReplication(file, true); + + assertEquals("Wrong reserve space for Tmp ", byteCount1, + fsVolumeImpl.getRecentReserved()); + + assertEquals("Reserved Tmp space is not released", 0, + fsVolumeImpl.getReservedForReplicas()); + } + + // Test when file creation fails + { + final Path file = new Path("/" + methodName + ".01.dat"); + + try (FSDataOutputStream os = fs.create(file, (short) 1)) { + // Write test data to the file + os.write(new byte[byteCount2]); + os.hsync(); + } + + BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, 10); + String firstReplicaNode = blockLocations[0].getNames()[0]; + + int newReplicaDNIndex = 0; + if (firstReplicaNode.equals(cluster.getDataNodes().get(0) + .getDisplayName())) { + newReplicaDNIndex = 1; + } + + BlockPoolSlice blockPoolSlice = Mockito.mock(BlockPoolSlice.class); + Mockito.when(blockPoolSlice.createTmpFile((Block) Mockito.any())) + .thenThrow(new IOException("Synthetic IO Exception Throgh MOCK")); + + final FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes() + .get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0); + + // Reserve some bytes to verify double clearing space should't happen + fsVolumeImpl.reserveSpaceForReplica(1000); + + Field field = FsVolumeImpl.class.getDeclaredField("bpSlices"); + field.setAccessible(true); + @SuppressWarnings("unchecked") + Map<String, BlockPoolSlice> bpSlices = (Map<String, BlockPoolSlice>) field + .get(fsVolumeImpl); + bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice); + + performReReplication(file, false); + + assertEquals("Wrong reserve space for Tmp ", byteCount2, + fsVolumeImpl.getRecentReserved()); + + assertEquals("Tmp space is not released OR released twice", 1000, + fsVolumeImpl.getReservedForReplicas()); + } + } + + private void performReReplication(Path filePath, boolean waitForSuccess) + throws Exception { + fs.setReplication(filePath, (short) 2); + + Thread.sleep(4000); + BlockLocation[] blockLocations = fs.getFileBlockLocations(filePath, 0, 10); + + if (waitForSuccess) { + // Wait for the re replication + while (blockLocations[0].getNames().length < 2) { + Thread.sleep(2000); + blockLocations = fs.getFileBlockLocations(filePath, 0, 10); + } + } + } + + /** + * Stress test to ensure we are not leaking reserved space. + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=600000) + public void stressTest() throws IOException, InterruptedException { + final int numWriters = 5; + startCluster(SMALL_BLOCK_SIZE, 1, SMALL_BLOCK_SIZE * numWriters * 10); + Writer[] writers = new Writer[numWriters]; + + // Start a few writers and let them run for a while. + for (int i = 0; i < numWriters; ++i) { + writers[i] = new Writer(client, SMALL_BLOCK_SIZE); + writers[i].start(); + } + + Thread.sleep(60000); + + // Stop the writers. + for (Writer w : writers) { + w.stopWriter(); + } + int filesCreated = 0; + int numFailures = 0; + for (Writer w : writers) { + w.join(); + filesCreated += w.getFilesCreated(); + numFailures += w.getNumFailures(); + } + + LOG.info("Stress test created " + filesCreated + + " files and hit " + numFailures + " failures"); + + // Check no space was leaked. + assertThat(singletonVolume.getReservedForReplicas(), is(0L)); + } + + private static class Writer extends Daemon { + private volatile boolean keepRunning; + private final DFSClient localClient; + private int filesCreated = 0; + private int numFailures = 0; + byte[] data; + + Writer(DFSClient client, int blockSize) throws IOException { + localClient = client; + keepRunning = true; + filesCreated = 0; + numFailures = 0; + + // At least some of the files should span a block boundary. + data = new byte[blockSize * 2]; + } + + @Override + public void run() { + /** + * Create a file, write up to 3 blocks of data and close the file. + * Do this in a loop until we are told to stop. + */ + while (keepRunning) { + OutputStream os = null; + try { + String filename = "/file-" + rand.nextLong(); + os = localClient.create(filename, false); + os.write(data, 0, rand.nextInt(data.length)); + IOUtils.closeQuietly(os); + os = null; + localClient.delete(filename, false); + Thread.sleep(50); // Sleep for a bit to avoid killing the system. + ++filesCreated; + } catch (IOException ioe) { + // Just ignore the exception and keep going. + ++numFailures; + } catch (InterruptedException ie) { + return; + } finally { + if (os != null) { + IOUtils.closeQuietly(os); + } + } + } + } + + public void stopWriter() { + keepRunning = false; + } + + public int getFilesCreated() { + return filesCreated; + } + + public int getNumFailures() { + return numFailures; + } + } +}