HDFS-8157. Writes to RAM DISK reserve locked memory for block files. (Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e453989a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e453989a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e453989a Branch: refs/heads/HDFS-7240 Commit: e453989a5722e653bd97e3e54f9bbdffc9454fba Parents: b0ad644 Author: Arpit Agarwal <a...@apache.org> Authored: Sat May 16 09:05:35 2015 -0700 Committer: Arpit Agarwal <a...@apache.org> Committed: Sat May 16 09:05:35 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/server/datanode/ReplicaInPipeline.java | 11 +- .../hdfs/server/datanode/ReplicaInfo.java | 12 +- .../server/datanode/fsdataset/FsVolumeSpi.java | 8 + .../datanode/fsdataset/impl/BlockPoolSlice.java | 2 +- .../impl/FsDatasetAsyncDiskService.java | 7 +- .../datanode/fsdataset/impl/FsDatasetCache.java | 85 +++++++- .../datanode/fsdataset/impl/FsDatasetImpl.java | 106 ++++++---- .../datanode/fsdataset/impl/FsVolumeImpl.java | 20 +- .../impl/RamDiskReplicaLruTracker.java | 19 +- .../fsdataset/impl/RamDiskReplicaTracker.java | 12 +- .../org/apache/hadoop/hdfs/MiniDFSCluster.java | 2 +- .../hdfs/server/balancer/TestBalancer.java | 9 +- .../server/datanode/SimulatedFSDataset.java | 4 + .../server/datanode/TestDirectoryScanner.java | 9 + .../server/datanode/TestFsDatasetCache.java | 4 +- .../datanode/extdataset/ExternalVolumeImpl.java | 4 + .../fsdataset/impl/LazyPersistTestCase.java | 57 ++++-- .../impl/TestLazyPersistLockedMemory.java | 201 +++++++++++++++++++ .../fsdataset/impl/TestWriteToReplica.java | 4 +- 20 files changed, 497 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0c4d850..8c823ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -560,6 +560,9 @@ Release 2.8.0 - UNRELEASED HDFS-8394. Move getAdditionalBlock() and related functionalities into a separate class. (wheat9) + HDFS-8157. Writes to RAM DISK reserve locked memory for block files. + (Arpit Agarwal) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index cc55f85..0eb143a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -51,7 +51,8 @@ public class ReplicaInPipeline extends ReplicaInfo * the bytes already written to this block. */ private long bytesReserved; - + private final long originalBytesReserved; + /** * Constructor for a zero length replica * @param blockId block id @@ -97,6 +98,7 @@ public class ReplicaInPipeline extends ReplicaInfo this.bytesOnDisk = len; this.writer = writer; this.bytesReserved = bytesToReserve; + this.originalBytesReserved = bytesToReserve; } /** @@ -109,6 +111,7 @@ public class ReplicaInPipeline extends ReplicaInfo this.bytesOnDisk = from.getBytesOnDisk(); this.writer = from.writer; this.bytesReserved = from.bytesReserved; + this.originalBytesReserved = from.originalBytesReserved; } @Override @@ -149,8 +152,14 @@ public class ReplicaInPipeline extends ReplicaInfo } @Override + public long getOriginalBytesReserved() { + return originalBytesReserved; + } + + @Override public void releaseAllBytesReserved() { // ReplicaInPipelineInterface getVolume().releaseReservedSpace(bytesReserved); + getVolume().releaseLockedMemory(bytesReserved); bytesReserved = 0; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index 940d3eb..136d8a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -218,7 +218,17 @@ abstract public class ReplicaInfo extends Block implements Replica { public long getBytesReserved() { return 0; } - + + /** + * Number of bytes originally reserved for this replica. The actual + * reservation is adjusted as data is written to disk. + * + * @return the number of bytes originally reserved for this replica. + */ + public long getOriginalBytesReserved() { + return 0; + } + /** * Copy specified file into a temporary file. Then rename the * temporary file to the original name. This will cause any http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index 2a8f31b..8d1bb2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -73,6 +73,14 @@ public interface FsVolumeSpi { public void releaseReservedSpace(long bytesToRelease); /** + * Release reserved memory for an RBW block written to transient storage + * i.e. RAM. + * bytesToRelease will be rounded down to the OS page size since locked + * memory reservation must always be a multiple of the page size. + */ + public void releaseLockedMemory(long bytesToRelease); + + /** * BlockIterator will return ExtendedBlock entries from a block pool in * this volume. The entries will be returned in sorted order.<p/> * http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index a47d564..951c759 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -475,7 +475,7 @@ class BlockPoolSlice { // eventually. if (newReplica.getVolume().isTransientStorage()) { lazyWriteReplicaMap.addReplica(bpid, blockId, - (FsVolumeImpl) newReplica.getVolume()); + (FsVolumeImpl) newReplica.getVolume(), 0); } else { lazyWriteReplicaMap.discardReplica(bpid, blockId, false); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index c1d3990..fdc9f83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; import java.io.FileDescriptor; -import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -277,7 +276,8 @@ class FsDatasetAsyncDiskService { @Override public void run() { - long dfsBytes = blockFile.length() + metaFile.length(); + final long blockLength = blockFile.length(); + final long metaLength = metaFile.length(); boolean result; result = (trashDirectory == null) ? deleteFiles() : moveFiles(); @@ -291,7 +291,8 @@ class FsDatasetAsyncDiskService { if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){ datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID()); } - volume.decDfsUsed(block.getBlockPoolId(), dfsBytes); + volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength); + volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength); LOG.info("Deleted " + block.getBlockPoolId() + " " + block.getLocalBlock() + " file " + blockFile); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index e0df0f2..6f524b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -151,10 +151,15 @@ public class FsDatasetCache { /** * Round up a number to the operating system page size. */ - public long round(long count) { - long newCount = - (count + (osPageSize - 1)) / osPageSize; - return newCount * osPageSize; + public long roundUp(long count) { + return (count + osPageSize - 1) & (~(osPageSize - 1)); + } + + /** + * Round down a number to the operating system page size. + */ + public long roundDown(long count) { + return count & (~(osPageSize - 1)); } } @@ -173,7 +178,7 @@ public class FsDatasetCache { * -1 if we failed. */ long reserve(long count) { - count = rounder.round(count); + count = rounder.roundUp(count); while (true) { long cur = usedBytes.get(); long next = cur + count; @@ -195,10 +200,23 @@ public class FsDatasetCache { * @return The new number of usedBytes. */ long release(long count) { - count = rounder.round(count); + count = rounder.roundUp(count); return usedBytes.addAndGet(-count); } - + + /** + * Release some bytes that we're using rounded down to the page size. + * + * @param count The number of bytes to release. We will round this + * down to the page size. + * + * @return The new number of usedBytes. + */ + long releaseRoundDown(long count) { + count = rounder.roundDown(count); + return usedBytes.addAndGet(-count); + } + long get() { return usedBytes.get(); } @@ -341,6 +359,52 @@ public class FsDatasetCache { } /** + * Try to reserve more bytes. + * + * @param count The number of bytes to add. We will round this + * up to the page size. + * + * @return The new number of usedBytes if we succeeded; + * -1 if we failed. + */ + long reserve(long count) { + return usedBytesCount.reserve(count); + } + + /** + * Release some bytes that we're using. + * + * @param count The number of bytes to release. We will round this + * up to the page size. + * + * @return The new number of usedBytes. + */ + long release(long count) { + return usedBytesCount.release(count); + } + + /** + * Release some bytes that we're using rounded down to the page size. + * + * @param count The number of bytes to release. We will round this + * down to the page size. + * + * @return The new number of usedBytes. + */ + long releaseRoundDown(long count) { + return usedBytesCount.releaseRoundDown(count); + } + + /** + * Get the OS page size. + * + * @return the OS page size. + */ + long getOsPageSize() { + return usedBytesCount.rounder.osPageSize; + } + + /** * Background worker that mmaps, mlocks, and checksums a block */ private class CachingTask implements Runnable { @@ -363,7 +427,7 @@ public class FsDatasetCache { MappableBlock mappableBlock = null; ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(), key.getBlockId(), length, genstamp); - long newUsedBytes = usedBytesCount.reserve(length); + long newUsedBytes = reserve(length); boolean reservedBytes = false; try { if (newUsedBytes < 0) { @@ -423,7 +487,7 @@ public class FsDatasetCache { IOUtils.closeQuietly(metaIn); if (!success) { if (reservedBytes) { - usedBytesCount.release(length); + release(length); } LOG.debug("Caching of {} was aborted. We are now caching only {} " + "bytes in total.", key, usedBytesCount.get()); @@ -502,8 +566,7 @@ public class FsDatasetCache { synchronized (FsDatasetCache.this) { mappableBlockMap.remove(key); } - long newUsedBytes = - usedBytesCount.release(value.mappableBlock.getLength()); + long newUsedBytes = release(value.mappableBlock.getLength()); numBlocksCached.addAndGet(-1); dataset.datanode.getMetrics().incrBlocksUncached(1); if (revocationTimeMs != 0) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 8725126..8ebd214 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -319,8 +319,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { cacheManager = new FsDatasetCache(this); // Start the lazy writer once we have built the replica maps. - lazyWriter = new Daemon(new LazyWriter(conf)); - lazyWriter.start(); + // We need to start the lazy writer even if MaxLockedMemory is set to + // zero because we may have un-persisted replicas in memory from before + // the process restart. To minimize the chances of data loss we'll + // ensure they get written to disk now. + if (ramDiskReplicaTracker.numReplicasNotPersisted() > 0 || + datanode.getDnConf().getMaxLockedMemory() > 0) { + lazyWriter = new Daemon(new LazyWriter(conf)); + lazyWriter.start(); + } else { + lazyWriter = null; + } + registerMBean(datanode.getDatanodeUuid()); // Add a Metrics2 Source Interface. This is same @@ -1284,26 +1294,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { " and thus cannot be created."); } // create a new block - FsVolumeReference ref; - while (true) { + FsVolumeReference ref = null; + + // Use ramdisk only if block size is a multiple of OS page size. + // This simplifies reservation for partially used replicas + // significantly. + if (allowLazyPersist && + lazyWriter != null && + b.getNumBytes() % cacheManager.getOsPageSize() == 0 && + (cacheManager.reserve(b.getNumBytes())) > 0) { try { - if (allowLazyPersist) { - // First try to place the block on a transient volume. - ref = volumes.getNextTransientVolume(b.getNumBytes()); - datanode.getMetrics().incrRamDiskBlocksWrite(); - } else { - ref = volumes.getNextVolume(storageType, b.getNumBytes()); - } - } catch (DiskOutOfSpaceException de) { - if (allowLazyPersist) { - datanode.getMetrics().incrRamDiskBlocksWriteFallback(); - allowLazyPersist = false; - continue; + // First try to place the block on a transient volume. + ref = volumes.getNextTransientVolume(b.getNumBytes()); + datanode.getMetrics().incrRamDiskBlocksWrite(); + } catch(DiskOutOfSpaceException de) { + // Ignore the exception since we just fall back to persistent storage. + datanode.getMetrics().incrRamDiskBlocksWriteFallback(); + } finally { + if (ref == null) { + cacheManager.release(b.getNumBytes()); } - throw de; } - break; } + + if (ref == null) { + ref = volumes.getNextVolume(storageType, b.getNumBytes()); + } + FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); // create an rbw file to hold block in the designated volume File f; @@ -1564,7 +1581,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); if (v.isTransientStorage()) { - ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v); + releaseLockedMemory( + replicaInfo.getOriginalBytesReserved() - replicaInfo.getNumBytes(), + false); + ramDiskReplicaTracker.addReplica( + bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes()); datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes()); } } @@ -1811,9 +1832,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } /** - * We're informed that a block is no longer valid. We - * could lazily garbage-collect the block, but why bother? - * just get rid of it. + * We're informed that a block is no longer valid. Delete it. */ @Override // FsDatasetSpi public void invalidate(String bpid, Block invalidBlks[]) throws IOException { @@ -2064,8 +2083,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { public void shutdown() { fsRunning = false; - ((LazyWriter) lazyWriter.getRunnable()).stop(); - lazyWriter.interrupt(); + if (lazyWriter != null) { + ((LazyWriter) lazyWriter.getRunnable()).stop(); + lazyWriter.interrupt(); + } if (mbeanName != null) { MBeans.unregister(mbeanName); @@ -2083,11 +2104,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { volumes.shutdown(); } - try { - lazyWriter.join(); - } catch (InterruptedException ie) { - LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " + - "from LazyWriter.join"); + if (lazyWriter != null) { + try { + lazyWriter.join(); + } catch (InterruptedException ie) { + LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " + + "from LazyWriter.join"); + } } } @@ -2173,7 +2196,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { diskFile.length(), diskGS, vol, diskFile.getParentFile()); volumeMap.add(bpid, diskBlockInfo); if (vol.isTransientStorage()) { - ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol); + long lockedBytesReserved = + cacheManager.reserve(diskBlockInfo.getNumBytes()) > 0 ? + diskBlockInfo.getNumBytes() : 0; + ramDiskReplicaTracker.addReplica( + bpid, blockId, (FsVolumeImpl) vol, lockedBytesReserved); } LOG.warn("Added missing block to memory " + diskBlockInfo); return; @@ -2760,12 +2787,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { boolean ramDiskConfigured = ramDiskConfigured(); // Add thread for DISK volume if RamDisk is configured if (ramDiskConfigured && + asyncLazyPersistService != null && !asyncLazyPersistService.queryVolume(v.getCurrentDir())) { asyncLazyPersistService.addVolume(v.getCurrentDir()); } // Remove thread for DISK volume if RamDisk is not configured if (!ramDiskConfigured && + asyncLazyPersistService != null && asyncLazyPersistService.queryVolume(v.getCurrentDir())) { asyncLazyPersistService.removeVolume(v.getCurrentDir()); } @@ -2790,9 +2819,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // Remove the old replicas if (blockFile.delete() || !blockFile.exists()) { - ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed); + FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume(); + volume.onBlockFileDeletion(bpid, blockFileUsed); if (metaFile.delete() || !metaFile.exists()) { - ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed); + volume.onMetaFileDeletion(bpid, metaFileUsed); } } @@ -2905,8 +2935,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } /** - * Attempt to evict one or more transient block replicas we have at least - * spaceNeeded bytes free. + * Attempt to evict one or more transient block replicas until we + * have at least spaceNeeded bytes free. */ private void evictBlocks() throws IOException { int iterations = 0; @@ -3056,5 +3086,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { s.add(blockId); } } + + void releaseLockedMemory(long count, boolean roundup) { + if (roundup) { + cacheManager.release(count); + } else { + cacheManager.releaseRoundDown(count); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index bc96a02..49a56bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -274,7 +274,18 @@ public class FsVolumeImpl implements FsVolumeSpi { return getBlockPoolSlice(bpid).getTmpDir(); } - void decDfsUsed(String bpid, long value) { + void onBlockFileDeletion(String bpid, long value) { + decDfsUsed(bpid, value); + if (isTransientStorage()) { + dataset.releaseLockedMemory(value, true); + } + } + + void onMetaFileDeletion(String bpid, long value) { + decDfsUsed(bpid, value); + } + + private void decDfsUsed(String bpid, long value) { synchronized(dataset) { BlockPoolSlice bp = bpSlices.get(bpid); if (bp != null) { @@ -428,6 +439,13 @@ public class FsVolumeImpl implements FsVolumeSpi { } } + @Override + public void releaseLockedMemory(long bytesToRelease) { + if (isTransientStorage()) { + dataset.releaseLockedMemory(bytesToRelease, false); + } + } + private enum SubdirFilter implements FilenameFilter { INSTANCE; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java index c01a6cf..b940736 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java @@ -38,8 +38,10 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker { private class RamDiskReplicaLru extends RamDiskReplica { long lastUsedTime; - private RamDiskReplicaLru(String bpid, long blockId, FsVolumeImpl ramDiskVolume) { - super(bpid, blockId, ramDiskVolume); + private RamDiskReplicaLru(String bpid, long blockId, + FsVolumeImpl ramDiskVolume, + long lockedBytesReserved) { + super(bpid, blockId, ramDiskVolume, lockedBytesReserved); } @Override @@ -70,20 +72,23 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker { TreeMultimap<Long, RamDiskReplicaLru> replicasPersisted; RamDiskReplicaLruTracker() { - replicaMaps = new HashMap<String, Map<Long, RamDiskReplicaLru>>(); - replicasNotPersisted = new LinkedList<RamDiskReplicaLru>(); + replicaMaps = new HashMap<>(); + replicasNotPersisted = new LinkedList<>(); replicasPersisted = TreeMultimap.create(); } @Override synchronized void addReplica(final String bpid, final long blockId, - final FsVolumeImpl transientVolume) { + final FsVolumeImpl transientVolume, + long lockedBytesReserved) { Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid); if (map == null) { - map = new HashMap<Long, RamDiskReplicaLru>(); + map = new HashMap<>(); replicaMaps.put(bpid, map); } - RamDiskReplicaLru ramDiskReplicaLru = new RamDiskReplicaLru(bpid, blockId, transientVolume); + RamDiskReplicaLru ramDiskReplicaLru = + new RamDiskReplicaLru(bpid, blockId, transientVolume, + lockedBytesReserved); map.put(blockId, ramDiskReplicaLru); replicasNotPersisted.add(ramDiskReplicaLru); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java index 7507925..335ed70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java @@ -45,6 +45,7 @@ public abstract class RamDiskReplicaTracker { private final long blockId; private File savedBlockFile; private File savedMetaFile; + private long lockedBytesReserved; private long creationTime; protected AtomicLong numReads = new AtomicLong(0); @@ -61,10 +62,12 @@ public abstract class RamDiskReplicaTracker { FsVolumeImpl lazyPersistVolume; RamDiskReplica(final String bpid, final long blockId, - final FsVolumeImpl ramDiskVolume) { + final FsVolumeImpl ramDiskVolume, + long lockedBytesReserved) { this.bpid = bpid; this.blockId = blockId; this.ramDiskVolume = ramDiskVolume; + this.lockedBytesReserved = lockedBytesReserved; lazyPersistVolume = null; savedMetaFile = null; savedBlockFile = null; @@ -168,6 +171,10 @@ public abstract class RamDiskReplicaTracker { public String toString() { return "[BlockPoolID=" + bpid + "; BlockId=" + blockId + "]"; } + + public long getLockedBytesReserved() { + return lockedBytesReserved; + } } /** @@ -201,7 +208,8 @@ public abstract class RamDiskReplicaTracker { * @param transientVolume RAM disk volume that stores the replica. */ abstract void addReplica(final String bpid, final long blockId, - final FsVolumeImpl transientVolume); + final FsVolumeImpl transientVolume, + long lockedBytesReserved); /** * Invoked when a replica is opened by a client. This may be used as http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 12ad23e..fdbacdc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -1582,7 +1582,7 @@ public class MiniDFSCluster { throw new IllegalStateException("Attempting to finalize " + "Namenode but it is not running"); } - ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"}); + ToolRunner.run(new DFSAdmin(conf), new String[]{"-finalizeUpgrade"}); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index e756f0b..92d31d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; @@ -120,13 +121,16 @@ public class TestBalancer { conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); } - static void initConfWithRamDisk(Configuration conf) { + static void initConfWithRamDisk(Configuration conf, + long ramDiskCapacity) { conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE); + conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, ramDiskCapacity); conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3); conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1); conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, DEFAULT_RAM_DISK_BLOCK_SIZE); + LazyPersistTestCase.initCacheManipulator(); } /* create a file with a length of <code>fileLen</code> */ @@ -1245,7 +1249,6 @@ public class TestBalancer { final int SEED = 0xFADED; final short REPL_FACT = 1; Configuration conf = new Configuration(); - initConfWithRamDisk(conf); final int defaultRamDiskCapacity = 10; final long ramDiskStorageLimit = @@ -1255,6 +1258,8 @@ public class TestBalancer { ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) + (DEFAULT_RAM_DISK_BLOCK_SIZE - 1); + initConfWithRamDisk(conf, ramDiskStorageLimit); + cluster = new MiniDFSCluster .Builder(conf) .numDataNodes(1) http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 2ac9416..778dd28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -492,6 +492,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { } @Override + public void releaseLockedMemory(long bytesToRelease) { + } + + @Override public void releaseReservedSpace(long bytesToRelease) { } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index b225e35..9b942b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Test; @@ -79,6 +80,8 @@ public class TestDirectoryScanner { CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_LENGTH); CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1); CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + CONF.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, + Long.MAX_VALUE); } /** create a file with a length of <code>fileLen</code> */ @@ -308,6 +311,7 @@ public class TestDirectoryScanner { @Test (timeout=300000) public void testRetainBlockOnPersistentStorage() throws Exception { + LazyPersistTestCase.initCacheManipulator(); cluster = new MiniDFSCluster .Builder(CONF) .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT }) @@ -349,6 +353,7 @@ public class TestDirectoryScanner { @Test (timeout=300000) public void testDeleteBlockOnTransientStorage() throws Exception { + LazyPersistTestCase.initCacheManipulator(); cluster = new MiniDFSCluster .Builder(CONF) .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT }) @@ -615,6 +620,10 @@ public class TestDirectoryScanner { } @Override + public void releaseLockedMemory(long bytesToRelease) { + } + + @Override public BlockIterator newBlockIterator(String bpid, String name) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index 7a09630..58932fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -339,7 +339,7 @@ public class TestFsDatasetCache { for (int i=0; i<numFiles-1; i++) { setHeartbeatResponse(cacheBlocks(fileLocs[i])); total = DFSTestUtil.verifyExpectedCacheUsage( - rounder.round(total + fileSizes[i]), 4 * (i + 1), fsd); + rounder.roundUp(total + fileSizes[i]), 4 * (i + 1), fsd); } // nth file should hit a capacity exception @@ -365,7 +365,7 @@ public class TestFsDatasetCache { int curCachedBlocks = 16; for (int i=0; i<numFiles-1; i++) { setHeartbeatResponse(uncacheBlocks(fileLocs[i])); - long uncachedBytes = rounder.round(fileSizes[i]); + long uncachedBytes = rounder.roundUp(fileSizes[i]); total -= uncachedBytes; curCachedBlocks -= uncachedBytes / BLOCK_SIZE; DFSTestUtil.verifyExpectedCacheUsage(total, curCachedBlocks, fsd); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java index ea9e4c1..3242ff7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java @@ -82,6 +82,10 @@ public class ExternalVolumeImpl implements FsVolumeSpi { } @Override + public void releaseLockedMemory(long bytesToRelease) { + } + + @Override public BlockIterator newBlockIterator(String bpid, String name) { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java index 5dc86f7..5ce5cc6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java @@ -23,16 +23,7 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST; import static org.apache.hadoop.fs.StorageType.DEFAULT; import static org.apache.hadoop.fs.StorageType.RAM_DISK; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC; +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -40,6 +31,7 @@ import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.EnumSet; import java.util.HashSet; @@ -68,6 +60,7 @@ import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.tools.JMXGet; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; @@ -80,8 +73,8 @@ public abstract class LazyPersistTestCase { static final byte LAZY_PERSIST_POLICY_ID = (byte) 15; static { - DFSTestUtil.setNameNodeLogLevel(Level.ALL); - GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.ALL); + DFSTestUtil.setNameNodeLogLevel(Level.DEBUG); + GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG); } protected static final int BLOCK_SIZE = 5 * 1024 * 1024; @@ -95,6 +88,8 @@ public abstract class LazyPersistTestCase { protected static final int LAZY_WRITER_INTERVAL_SEC = 1; protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class); protected static final short REPL_FACTOR = 1; + protected final long osPageSize = + NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize(); protected MiniDFSCluster cluster; protected DistributedFileSystem fs; @@ -194,7 +189,7 @@ public abstract class LazyPersistTestCase { protected final void makeRandomTestFile(Path path, long length, boolean isLazyPersist, long seed) throws IOException { DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length, - BLOCK_SIZE, REPL_FACTOR, seed, true); + BLOCK_SIZE, REPL_FACTOR, seed, true); } protected final void makeTestFile(Path path, long length, @@ -242,10 +237,12 @@ public abstract class LazyPersistTestCase { int ramDiskReplicaCapacity, long ramDiskStorageLimit, long evictionLowWatermarkReplicas, + long maxLockedMemory, boolean useSCR, boolean useLegacyBlockReaderLocal, boolean disableScrubber) throws IOException { + initCacheManipulator(); Configuration conf = new Configuration(); conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); if (disableScrubber) { @@ -262,6 +259,7 @@ public abstract class LazyPersistTestCase { conf.setLong(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, evictionLowWatermarkReplicas * BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1); + conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, maxLockedMemory); if (useSCR) { conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); @@ -311,6 +309,31 @@ public abstract class LazyPersistTestCase { LOG.info("Cluster startup complete"); } + /** + * Use a dummy cache manipulator for testing. + */ + public static void initCacheManipulator() { + NativeIO.POSIX.setCacheManipulator(new NativeIO.POSIX.CacheManipulator() { + @Override + public void mlock(String identifier, + ByteBuffer mmap, long length) throws IOException { + LOG.info("LazyPersistTestCase: faking mlock of " + identifier + " bytes."); + } + + @Override + public long getMemlockLimit() { + LOG.info("LazyPersistTestCase: fake return " + Long.MAX_VALUE); + return Long.MAX_VALUE; + } + + @Override + public boolean verifyCanMlock() { + LOG.info("LazyPersistTestCase: fake return " + true); + return true; + } + }); + } + ClusterWithRamDiskBuilder getClusterBuilder() { return new ClusterWithRamDiskBuilder(); } @@ -344,6 +367,11 @@ public abstract class LazyPersistTestCase { return this; } + public ClusterWithRamDiskBuilder setMaxLockedMemory(long maxLockedMemory) { + this.maxLockedMemory = maxLockedMemory; + return this; + } + public ClusterWithRamDiskBuilder setUseScr(boolean useScr) { this.useScr = useScr; return this; @@ -376,13 +404,14 @@ public abstract class LazyPersistTestCase { LazyPersistTestCase.this.startUpCluster( numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity, ramDiskStorageLimit, evictionLowWatermarkReplicas, - useScr, useLegacyBlockReaderLocal,disableScrubber); + maxLockedMemory, useScr, useLegacyBlockReaderLocal, disableScrubber); } private int numDatanodes = REPL_FACTOR; private StorageType[] storageTypes = null; private int ramDiskReplicaCapacity = -1; private long ramDiskStorageLimit = -1; + private long maxLockedMemory = Long.MAX_VALUE; private boolean hasTransientStorage = true; private boolean useScr = false; private boolean useLegacyBlockReaderLocal = false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java new file mode 100644 index 0000000..9ea4665 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + + +import com.google.common.base.Supplier; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSOutputStream; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.MetricsAsserts; +import org.junit.Test; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.fs.CreateFlag.CREATE; +import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST; +import static org.apache.hadoop.fs.StorageType.DEFAULT; +import static org.apache.hadoop.fs.StorageType.RAM_DISK; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +/** + * Verify that locked memory is used correctly when writing to replicas in + * memory + */ +public class TestLazyPersistLockedMemory extends LazyPersistTestCase { + + /** + * RAM disk present but locked memory is set to zero. Placement should + * fall back to disk. + */ + @Test + public void testWithNoLockedMemory() throws IOException { + getClusterBuilder().setNumDatanodes(1) + .setMaxLockedMemory(0).build(); + + final String METHOD_NAME = GenericTestUtils.getMethodName(); + Path path = new Path("/" + METHOD_NAME + ".dat"); + makeTestFile(path, BLOCK_SIZE, true); + ensureFileReplicasOnStorageType(path, DEFAULT); + } + + @Test + public void testReservation() + throws IOException, TimeoutException, InterruptedException { + getClusterBuilder().setNumDatanodes(1) + .setMaxLockedMemory(BLOCK_SIZE).build(); + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset(); + + // Create a file and ensure the replica in RAM_DISK uses locked memory. + Path path = new Path("/" + METHOD_NAME + ".dat"); + makeTestFile(path, BLOCK_SIZE, true); + ensureFileReplicasOnStorageType(path, RAM_DISK); + assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE)); + } + + @Test + public void testReleaseOnFileDeletion() + throws IOException, TimeoutException, InterruptedException { + getClusterBuilder().setNumDatanodes(1) + .setMaxLockedMemory(BLOCK_SIZE).build(); + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset(); + + Path path = new Path("/" + METHOD_NAME + ".dat"); + makeTestFile(path, BLOCK_SIZE, true); + ensureFileReplicasOnStorageType(path, RAM_DISK); + assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE)); + + // Delete the file and ensure that the locked memory is released. + fs.delete(path, false); + DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); + waitForLockedBytesUsed(fsd, 0); + } + + /** + * Verify that locked RAM is released when blocks are evicted from RAM disk. + */ + @Test + public void testReleaseOnEviction() + throws IOException, TimeoutException, InterruptedException { + getClusterBuilder().setNumDatanodes(1) + .setMaxLockedMemory(BLOCK_SIZE) + .setRamDiskReplicaCapacity(BLOCK_SIZE * 2 - 1) + .build(); + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset(); + + Path path = new Path("/" + METHOD_NAME + ".dat"); + makeTestFile(path, BLOCK_SIZE, true); + + // The block should get evicted soon since it pushes RAM disk free + // space below the threshold. + waitForLockedBytesUsed(fsd, 0); + + MetricsRecordBuilder rb = + MetricsAsserts.getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); + MetricsAsserts.assertCounter("RamDiskBlocksEvicted", 1L, rb); + } + + /** + * Verify that locked bytes are correctly updated when a block is finalized + * at less than its max length. + */ + @Test + public void testShortBlockFinalized() + throws IOException, TimeoutException, InterruptedException { + getClusterBuilder().setNumDatanodes(1).build(); + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset(); + + Path path = new Path("/" + METHOD_NAME + ".dat"); + makeTestFile(path, 1, true); + assertThat(fsd.getCacheUsed(), is(osPageSize)); + + // Delete the file and ensure locked RAM usage goes to zero. + fs.delete(path, false); + waitForLockedBytesUsed(fsd, 0); + } + + /** + * Verify that locked bytes are correctly updated when the client goes + * away unexpectedly during a write. + */ + @Test + public void testWritePipelineFailure() + throws IOException, TimeoutException, InterruptedException { + getClusterBuilder().setNumDatanodes(1).build(); + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset(); + + Path path = new Path("/" + METHOD_NAME + ".dat"); + + EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE, LAZY_PERSIST); + // Write 1 byte to the file and kill the writer. + final FSDataOutputStream fos = + fs.create(path, + FsPermission.getFileDefault(), + createFlags, + BUFFER_LENGTH, + REPL_FACTOR, + BLOCK_SIZE, + null); + + fos.write(new byte[1]); + fos.hsync(); + DFSTestUtil.abortStream((DFSOutputStream) fos.getWrappedStream()); + waitForLockedBytesUsed(fsd, osPageSize); + + // Delete the file and ensure locked RAM goes to zero. + fs.delete(path, false); + DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); + waitForLockedBytesUsed(fsd, 0); + } + + /** + * Wait until used locked byte count goes to the expected value. + * @throws TimeoutException after 300 seconds. + */ + private void waitForLockedBytesUsed(final FsDatasetSpi<?> fsd, + final long expectedLockedBytes) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + long cacheUsed = fsd.getCacheUsed(); + LOG.info("cacheUsed=" + cacheUsed + ", waiting for it to be " + expectedLockedBytes); + if (cacheUsed < 0) { + throw new IllegalStateException("cacheUsed unpexpectedly negative"); + } + return (cacheUsed == expectedLockedBytes); + } + }, 1000, 300000); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index d5664cf..a77184b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -204,7 +204,7 @@ public class TestWriteToReplica { long available = v.getCapacity()-v.getDfsUsed(); long expectedLen = blocks[FINALIZED].getNumBytes(); try { - v.decDfsUsed(bpid, -available); + v.onBlockFileDeletion(bpid, -available); blocks[FINALIZED].setNumBytes(expectedLen+100); dataSet.append(blocks[FINALIZED], newGS, expectedLen); Assert.fail("Should not have space to append to an RWR replica" + blocks[RWR]); @@ -212,7 +212,7 @@ public class TestWriteToReplica { Assert.assertTrue(e.getMessage().startsWith( "Insufficient space for appending to ")); } - v.decDfsUsed(bpid, available); + v.onBlockFileDeletion(bpid, available); blocks[FINALIZED].setNumBytes(expectedLen); newGS = blocks[RBW].getGenerationStamp()+1;