This is an automated email from the ASF dual-hosted git repository. jing9 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 7743d40 HDFS-15549. Use Hardlink to move replica between DISK and ARCHIVE storage if on same filesystem mount (#2583). Contributed by Leon Gao. 7743d40 is described below commit 7743d40ac5b6fba73204feba22d2256d4e9d70f0 Author: LeonGao <lia...@uber.com> AuthorDate: Fri Jan 15 16:28:11 2021 -0800 HDFS-15549. Use Hardlink to move replica between DISK and ARCHIVE storage if on same filesystem mount (#2583). Contributed by Leon Gao. --- .../main/java/org/apache/hadoop/fs/HardLink.java | 4 +- .../java/org/apache/hadoop/fs/StorageType.java | 5 + .../hdfs/server/datanode/DirectoryScanner.java | 3 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 136 ++++++++- .../datanode/fsdataset/impl/FsVolumeImpl.java | 23 +- .../datanode/fsdataset/impl/FsVolumeList.java | 29 +- .../datanode/fsdataset/impl/TestFsDatasetImpl.java | 317 ++++++++++++++++++++- .../apache/hadoop/hdfs/server/mover/TestMover.java | 6 + 8 files changed, 502 insertions(+), 21 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java index 30f793d..887ae0c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java @@ -153,11 +153,11 @@ public class HardLink { */ /** - * Creates a hardlink + * Creates a hardlink. * @param file - existing source file * @param linkName - desired target link file */ - public static void createHardLink(File file, File linkName) + public static void createHardLink(File file, File linkName) throws IOException { if (file == null) { throw new IOException( diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java index e11c129..b17864a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java @@ -92,6 +92,11 @@ public enum StorageType { return StorageType.valueOf(StringUtils.toUpperCase(s)); } + public static boolean allowSameDiskTiering(StorageType storageType) { + return storageType == StorageType.DISK + || storageType == StorageType.ARCHIVE; + } + private static List<StorageType> getNonTransientTypes() { List<StorageType> nonTransientTypes = new ArrayList<>(); for (StorageType t : VALUES) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index d835108..66cfa01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -322,7 +322,8 @@ public class DirectoryScanner implements Runnable { * Start the scanner. The scanner will run every * {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds. */ - void start() { + @VisibleForTesting + public void start() { shouldRun.set(true); long firstScanTime = ThreadLocalRandom.current().nextLong(scanPeriodMsecs); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index f5bfd92..c3dbf48 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -48,6 +48,7 @@ import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; import javax.management.StandardMBean; +import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -994,6 +995,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { smallBufferSize, conf); } + /** + * Link the block and meta files for the given block to the given destination. + * @return the new meta and block files. + * @throws IOException + */ + static File[] hardLinkBlockFiles(long blockId, long genStamp, + ReplicaInfo srcReplica, File destRoot) throws IOException { + final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId); + // blockName is same as the filename for the block + final File dstFile = new File(destDir, srcReplica.getBlockName()); + final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp); + return hardLinkBlockFiles(srcReplica, dstMeta, dstFile); + } + static File[] copyBlockFiles(ReplicaInfo srcReplica, File dstMeta, File dstFile, boolean calculateChecksum, int smallBufferSize, final Configuration conf) @@ -1026,6 +1041,34 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { return new File[] {dstMeta, dstFile}; } + static File[] hardLinkBlockFiles(ReplicaInfo srcReplica, File dstMeta, + File dstFile) + throws IOException { + // Create parent folder if not exists. + srcReplica.getFileIoProvider() + .mkdirs(srcReplica.getVolume(), dstFile.getParentFile()); + try { + HardLink.createHardLink( + new File(srcReplica.getBlockURI()), dstFile); + } catch (IOException e) { + throw new IOException("Failed to hardLink " + + srcReplica + " block file to " + + dstFile, e); + } + try { + HardLink.createHardLink( + new File(srcReplica.getMetadataURI()), dstMeta); + } catch (IOException e) { + throw new IOException("Failed to hardLink " + + srcReplica + " metadata to " + + dstMeta, e); + } + if (LOG.isDebugEnabled()) { + LOG.info("Linked " + srcReplica.getBlockURI() + " to " + dstFile); + } + return new File[]{dstMeta, dstFile}; + } + /** * Move block files from one storage to another storage. * @return Returns the Old replicaInfo @@ -1058,12 +1101,30 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } FsVolumeReference volumeRef = null; + boolean shouldConsiderSameMountVolume = + shouldConsiderSameMountVolume(replicaInfo.getVolume(), + targetStorageType, targetStorageId); + boolean useVolumeOnSameMount = false; + try (AutoCloseableLock lock = datasetReadLock.acquire()) { - volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId, - block.getNumBytes()); + if (shouldConsiderSameMountVolume) { + volumeRef = volumes.getVolumeByMount(targetStorageType, + ((FsVolumeImpl) replicaInfo.getVolume()).getMount(), + block.getNumBytes()); + if (volumeRef != null) { + useVolumeOnSameMount = true; + } + } + if (!useVolumeOnSameMount) { + volumeRef = volumes.getNextVolume( + targetStorageType, + targetStorageId, + block.getNumBytes() + ); + } } try { - moveBlock(block, replicaInfo, volumeRef); + moveBlock(block, replicaInfo, volumeRef, useVolumeOnSameMount); } finally { if (volumeRef != null) { volumeRef.close(); @@ -1075,19 +1136,53 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } /** + * When configuring DISK/ARCHIVE on same volume, + * check if we should find the counterpart on the same disk mount. + */ + @VisibleForTesting + boolean shouldConsiderSameMountVolume(FsVolumeSpi fsVolume, + StorageType targetStorageType, String targetStorageID) { + if (targetStorageID != null && !targetStorageID.isEmpty()) { + return false; + } + if (!(fsVolume instanceof FsVolumeImpl) + || ((FsVolumeImpl) fsVolume).getMount().isEmpty()) { + return false; + } + StorageType sourceStorageType = fsVolume.getStorageType(); + // Source/dest storage types are different + if (sourceStorageType == targetStorageType) { + return false; + } + // Source/dest storage types are either DISK or ARCHIVE. + return StorageType.allowSameDiskTiering(sourceStorageType) + && StorageType.allowSameDiskTiering(targetStorageType); + } + + /** * Moves a block from a given volume to another. * * @param block - Extended Block * @param replicaInfo - ReplicaInfo * @param volumeRef - Volume Ref - Closed by caller. + * @param moveBlockToLocalMount - Whether we use shortcut + * to move block on same mount. * @return newReplicaInfo * @throws IOException */ @VisibleForTesting ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo, - FsVolumeReference volumeRef) throws IOException { - ReplicaInfo newReplicaInfo = copyReplicaToVolume(block, replicaInfo, - volumeRef); + FsVolumeReference volumeRef, boolean moveBlockToLocalMount) + throws IOException { + ReplicaInfo newReplicaInfo; + if (moveBlockToLocalMount) { + newReplicaInfo = moveReplicaToVolumeOnSameMount(block, replicaInfo, + volumeRef); + } else { + newReplicaInfo = copyReplicaToVolume(block, replicaInfo, + volumeRef); + } + finalizeNewReplica(newReplicaInfo, block); removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId()); return newReplicaInfo; @@ -1129,6 +1224,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } /** + * Shortcut to use hardlink to move blocks on same mount. + * This is useful when moving blocks between storage types on same disk mount. + * Two cases need to be considered carefully: + * 1) Datanode restart in the middle should not cause data loss. + * We use hardlink to avoid this. + * 2) Finalized blocks can be reopened to append. + * This is already handled by dataset lock and gen stamp. + * See HDFS-12942 + * + * @param block - Extended Block + * @param replicaInfo - ReplicaInfo + * @param volumeRef - Volume Ref - Closed by caller. + * @return newReplicaInfo new replica object created in specified volume. + * @throws IOException + */ + @VisibleForTesting + ReplicaInfo moveReplicaToVolumeOnSameMount(ExtendedBlock block, + ReplicaInfo replicaInfo, + FsVolumeReference volumeRef) throws IOException { + FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume(); + // Move files to temp dir first + ReplicaInfo newReplicaInfo = targetVolume.hardLinkBlockToTmpLocation(block, + replicaInfo); + return newReplicaInfo; + } + + /** * Finalizes newReplica by calling finalizeReplica internally. * * @param newReplicaInfo - ReplicaInfo @@ -1177,7 +1299,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } try { - moveBlock(block, replicaInfo, volumeRef); + moveBlock(block, replicaInfo, volumeRef, false); } finally { if (volumeRef != null) { volumeRef.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index ccb76b1..07e14fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -484,9 +484,8 @@ public class FsVolumeImpl implements FsVolumeSpi { // should share the same amount of reserved capacity. // When calculating actual non dfs used, // exclude DFS used capacity by another volume. - if (enableSameDiskTiering && - (storageType == StorageType.DISK - || storageType == StorageType.ARCHIVE)) { + if (enableSameDiskTiering + && StorageType.allowSameDiskTiering(storageType)) { StorageType counterpartStorageType = storageType == StorageType.DISK ? StorageType.ARCHIVE : StorageType.DISK; FsVolumeReference counterpartRef = dataset @@ -1529,6 +1528,24 @@ public class FsVolumeImpl implements FsVolumeSpi { return newReplicaInfo; } + public ReplicaInfo hardLinkBlockToTmpLocation(ExtendedBlock block, + ReplicaInfo replicaInfo) throws IOException { + + File[] blockFiles = FsDatasetImpl.hardLinkBlockFiles(block.getBlockId(), + block.getGenerationStamp(), replicaInfo, + getTmpDir(block.getBlockPoolId())); + + ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY) + .setBlockId(replicaInfo.getBlockId()) + .setGenerationStamp(replicaInfo.getGenerationStamp()) + .setFsVolume(this) + .setDirectoryToUse(blockFiles[0].getParentFile()) + .setBytesToReserve(0) + .build(); + newReplicaInfo.setNumBytes(blockFiles[1].length()); + return newReplicaInfo; + } + public File[] copyBlockToLazyPersistLocation(String bpId, long blockId, long genStamp, ReplicaInfo replicaInfo, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 2d6593d..38cf399 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -111,6 +111,30 @@ class FsVolumeList { } } + /** + * Get volume by disk mount to place a block. + * This is useful for same disk tiering. + * + * @param storageType The desired {@link StorageType} + * @param mount Disk mount of the volume + * @param blockSize Free space needed on the volume + * @return + * @throws IOException + */ + FsVolumeReference getVolumeByMount(StorageType storageType, + String mount, long blockSize) throws IOException { + if (!enableSameDiskTiering) { + return null; + } + FsVolumeReference volume = mountVolumeMap + .getVolumeRefByMountAndStorageType(mount, storageType); + // Check if volume has enough capacity + if (volume != null && volume.getVolume().getAvailable() > blockSize) { + return volume; + } + return null; + } + /** * Get next volume. * @@ -354,9 +378,8 @@ class FsVolumeList { * Check if same disk tiering is applied to the volume. */ private boolean isSameDiskTieringApplied(FsVolumeImpl target) { - return enableSameDiskTiering && - (target.getStorageType() == StorageType.DISK - || target.getStorageType() == StorageType.ARCHIVE); + return enableSameDiskTiering + && StorageType.allowSameDiskTiering(target.getStorageType()); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 33a6c4f..80437ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -17,9 +17,12 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import java.io.InputStream; import java.util.Arrays; import java.util.Collections; import java.util.function.Supplier; + +import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import java.io.OutputStream; @@ -68,6 +71,7 @@ import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.AutoCloseableLock; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.FakeTimer; import org.apache.hadoop.util.StringUtils; import org.junit.Assert; @@ -1070,24 +1074,43 @@ public class TestFsDatasetImpl { } } + /** + * When moving blocks using hardLink or copy + * and append happened in the middle, + * block movement should fail and hardlink is removed. + */ @Test(timeout = 30000) public void testMoveBlockFailure() { + // Test copy + testMoveBlockFailure(conf); + // Test hardlink + conf.setBoolean(DFSConfigKeys + .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true); + conf.setDouble(DFSConfigKeys + .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5); + testMoveBlockFailure(conf); + } + + private void testMoveBlockFailure(Configuration config) { MiniDFSCluster cluster = null; try { + cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(1) - .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK}) + .storageTypes( + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}) .storagesPerDatanode(2) .build(); FileSystem fs = cluster.getFileSystem(); DataNode dataNode = cluster.getDataNodes().get(0); Path filePath = new Path("testData"); - DFSTestUtil.createFile(fs, filePath, 100, (short) 1, 0); - ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath); + long fileLen = 100; + ExtendedBlock block = createTestFile(fs, fileLen, filePath); FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset(); - ReplicaInfo newReplicaInfo = createNewReplicaObj(block, fsDataSetImpl); + ReplicaInfo newReplicaInfo = + createNewReplicaObjWithLink(block, fsDataSetImpl); // Append to file to update its GS FSDataOutputStream out = fs.append(filePath, (short) 1); @@ -1095,6 +1118,7 @@ public class TestFsDatasetImpl { out.hflush(); // Call finalizeNewReplica + assertTrue(newReplicaInfo.blockDataExists()); LOG.info("GenerationStamp of old replica: {}", block.getGenerationStamp()); LOG.info("GenerationStamp of new replica: {}", fsDataSetImpl @@ -1103,6 +1127,9 @@ public class TestFsDatasetImpl { LambdaTestUtils.intercept(IOException.class, "Generation Stamp " + "should be monotonically increased.", () -> fsDataSetImpl.finalizeNewReplica(newReplicaInfo, block)); + assertFalse(newReplicaInfo.blockDataExists()); + + validateFileLen(fs, fileLen, filePath); } catch (Exception ex) { LOG.info("Exception in testMoveBlockFailure ", ex); fail("Exception while testing testMoveBlockFailure "); @@ -1144,6 +1171,253 @@ public class TestFsDatasetImpl { } /** + * Make sure datanode restart can clean up un-finalized links, + * if the block is not finalized yet. + */ + @Test(timeout = 30000) + public void testDnRestartWithHardLinkInTmp() { + MiniDFSCluster cluster = null; + try { + conf.setBoolean(DFSConfigKeys + .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true); + conf.setDouble(DFSConfigKeys + .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .storageTypes( + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}) + .storagesPerDatanode(2) + .build(); + FileSystem fs = cluster.getFileSystem(); + DataNode dataNode = cluster.getDataNodes().get(0); + + Path filePath = new Path("testData"); + long fileLen = 100; + + ExtendedBlock block = createTestFile(fs, fileLen, filePath); + + FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset(); + + ReplicaInfo oldReplicaInfo = fsDataSetImpl.getReplicaInfo(block); + ReplicaInfo newReplicaInfo = + createNewReplicaObjWithLink(block, fsDataSetImpl); + + // Link exists + assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI()))); + + cluster.restartDataNode(0); + cluster.waitDatanodeFullyStarted(cluster.getDataNodes().get(0), 60000); + cluster.triggerBlockReports(); + + // Un-finalized replica data (hard link) is deleted as they were in /tmp + assertFalse(Files.exists(Paths.get(newReplicaInfo.getBlockURI()))); + + // Old block is there. + assertTrue(Files.exists(Paths.get(oldReplicaInfo.getBlockURI()))); + + validateFileLen(fs, fileLen, filePath); + + } catch (Exception ex) { + LOG.info("Exception in testDnRestartWithHardLinkInTmp ", ex); + fail("Exception while testing testDnRestartWithHardLinkInTmp "); + } finally { + if (cluster.isClusterUp()) { + cluster.shutdown(); + } + } + } + + /** + * If new block is finalized and DN restarted, + * DiskScanner should clean up the hardlink correctly. + */ + @Test(timeout = 30000) + public void testDnRestartWithHardLink() { + MiniDFSCluster cluster = null; + try { + conf.setBoolean(DFSConfigKeys + .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true); + conf.setDouble(DFSConfigKeys + .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .storageTypes( + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}) + .storagesPerDatanode(2) + .build(); + FileSystem fs = cluster.getFileSystem(); + DataNode dataNode = cluster.getDataNodes().get(0); + + Path filePath = new Path("testData"); + long fileLen = 100; + + ExtendedBlock block = createTestFile(fs, fileLen, filePath); + + FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset(); + + final ReplicaInfo oldReplicaInfo = fsDataSetImpl.getReplicaInfo(block); + + fsDataSetImpl.finalizeNewReplica( + createNewReplicaObjWithLink(block, fsDataSetImpl), block); + + ReplicaInfo newReplicaInfo = fsDataSetImpl.getReplicaInfo(block); + + cluster.restartDataNode(0); + cluster.waitDatanodeFullyStarted(cluster.getDataNodes().get(0), 60000); + cluster.triggerBlockReports(); + + assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI()))); + assertTrue(Files.exists(Paths.get(oldReplicaInfo.getBlockURI()))); + + DirectoryScanner scanner = new DirectoryScanner( + cluster.getDataNodes().get(0).getFSDataset(), conf); + scanner.start(); + scanner.run(); + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override public Boolean get() { + return !Files.exists(Paths.get(oldReplicaInfo.getBlockURI())); + } + }, 100, 10000); + assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI()))); + + validateFileLen(fs, fileLen, filePath); + + } catch (Exception ex) { + LOG.info("Exception in testDnRestartWithHardLink ", ex); + fail("Exception while testing testDnRestartWithHardLink "); + } finally { + if (cluster.isClusterUp()) { + cluster.shutdown(); + } + } + } + + @Test(timeout = 30000) + public void testMoveBlockSuccessWithSameMountMove() { + MiniDFSCluster cluster = null; + try { + conf.setBoolean(DFSConfigKeys + .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true); + conf.setDouble(DFSConfigKeys + .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .storageTypes( + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}) + .storagesPerDatanode(2) + .build(); + FileSystem fs = cluster.getFileSystem(); + DataNode dataNode = cluster.getDataNodes().get(0); + Path filePath = new Path("testData"); + long fileLen = 100; + + ExtendedBlock block = createTestFile(fs, fileLen, filePath); + + FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset(); + assertEquals(StorageType.DISK, + fsDataSetImpl.getReplicaInfo(block).getVolume().getStorageType()); + + FsDatasetImpl fsDataSetImplSpy = + spy((FsDatasetImpl) dataNode.getFSDataset()); + fsDataSetImplSpy.moveBlockAcrossStorage( + block, StorageType.ARCHIVE, null); + + // Make sure it is done thru hardlink + verify(fsDataSetImplSpy).moveBlock(any(), any(), any(), eq(true)); + + assertEquals(StorageType.ARCHIVE, + fsDataSetImpl.getReplicaInfo(block).getVolume().getStorageType()); + validateFileLen(fs, fileLen, filePath); + + } catch (Exception ex) { + LOG.info("Exception in testMoveBlockSuccessWithSameMountMove ", ex); + fail("testMoveBlockSuccessWithSameMountMove operation should succeed"); + } finally { + if (cluster.isClusterUp()) { + cluster.shutdown(); + } + } + } + + // Move should fail if the volume on same mount has no space. + @Test(timeout = 30000) + public void testMoveBlockWithSameMountMoveWithoutSpace() { + MiniDFSCluster cluster = null; + try { + conf.setBoolean(DFSConfigKeys + .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true); + conf.setDouble(DFSConfigKeys + .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.0); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .storageTypes( + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}) + .storagesPerDatanode(2) + .build(); + FileSystem fs = cluster.getFileSystem(); + DataNode dataNode = cluster.getDataNodes().get(0); + Path filePath = new Path("testData"); + long fileLen = 100; + + ExtendedBlock block = createTestFile(fs, fileLen, filePath); + + FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset(); + assertEquals(StorageType.DISK, + fsDataSetImpl.getReplicaInfo(block).getVolume().getStorageType()); + + FsDatasetImpl fsDataSetImplSpy = + spy((FsDatasetImpl) dataNode.getFSDataset()); + fsDataSetImplSpy.moveBlockAcrossStorage( + block, StorageType.ARCHIVE, null); + + fail("testMoveBlockWithSameMountMoveWithoutSpace operation" + + " should failed"); + } catch (Exception ex) { + assertTrue(ex instanceof DiskChecker.DiskOutOfSpaceException); + } finally { + if (cluster.isClusterUp()) { + cluster.shutdown(); + } + } + } + + // More tests on shouldConsiderSameMountVolume. + @Test(timeout = 10000) + public void testShouldConsiderSameMountVolume() throws IOException { + FsVolumeImpl volume = new FsVolumeImplBuilder() + .setConf(conf) + .setDataset(dataset) + .setStorageID("storage-id") + .setStorageDirectory( + new StorageDirectory(StorageLocation.parse(BASE_DIR))) + .build(); + assertFalse(dataset.shouldConsiderSameMountVolume(volume, + StorageType.ARCHIVE, null)); + + conf.setBoolean(DFSConfigKeys + .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true); + conf.setDouble(DFSConfigKeys + .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, + 0.5); + volume = new FsVolumeImplBuilder() + .setConf(conf) + .setDataset(dataset) + .setStorageID("storage-id") + .setStorageDirectory( + new StorageDirectory(StorageLocation.parse(BASE_DIR))) + .build(); + assertTrue(dataset.shouldConsiderSameMountVolume(volume, + StorageType.ARCHIVE, null)); + assertTrue(dataset.shouldConsiderSameMountVolume(volume, + StorageType.ARCHIVE, "")); + assertFalse(dataset.shouldConsiderSameMountVolume(volume, + StorageType.DISK, null)); + assertFalse(dataset.shouldConsiderSameMountVolume(volume, + StorageType.ARCHIVE, "target")); + } + + /** * Create a new temporary replica of replicaInfo object in another volume. * * @param block - Extended Block @@ -1159,6 +1433,38 @@ public class TestFsDatasetImpl { } /** + * Create a new temporary replica of replicaInfo object in another volume. + * + * @param block - Extended Block + * @param fsDataSetImpl - FsDatasetImpl reference + * @throws IOException + */ + private ReplicaInfo createNewReplicaObjWithLink(ExtendedBlock block, + FsDatasetImpl fsDataSetImpl) throws IOException { + ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block); + FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl); + return fsDataSetImpl.moveReplicaToVolumeOnSameMount(block, replicaInfo, + destVolume.obtainReference()); + } + + private ExtendedBlock createTestFile(FileSystem fs, + long fileLen, Path filePath) throws IOException { + DFSTestUtil.createFile(fs, filePath, fileLen, (short) 1, 0); + return DFSTestUtil.getFirstBlock(fs, filePath); + } + + private void validateFileLen(FileSystem fs, + long fileLen, Path filePath) throws IOException { + // Read data file to make sure it is good. + InputStream in = fs.open(filePath); + int bytesCount = 0; + while (in.read() != -1) { + bytesCount++; + } + assertTrue(fileLen <= bytesCount); + } + + /** * Finds a new destination volume for block. * * @param block - Extended Block @@ -1225,7 +1531,8 @@ public class TestFsDatasetImpl { ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block); FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl); assertNotNull("Destination volume should not be null.", destVolume); - fsDataSetImpl.moveBlock(block, replicaInfo, destVolume.obtainReference()); + fsDataSetImpl.moveBlock(block, replicaInfo, + destVolume.obtainReference(), false); // Trigger block report to update block info in NN cluster.triggerBlockReports(); blkReader.read(buf, 512, 512); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index 5393b90..481c7cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -446,6 +446,12 @@ public class TestMover { final Configuration conf = new HdfsConfiguration(); initConf(conf); testWithinSameNode(conf); + // Test movement with hardlink, when same disk tiering is enabled. + conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true); + conf.setDouble(DFSConfigKeys + .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5); + testWithinSameNode(conf); + conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, false); } private void checkMovePaths(List<Path> actual, Path... expected) { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org