This is an automated email from the ASF dual-hosted git repository.

jing9 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7743d40  HDFS-15549. Use Hardlink to move replica between DISK and 
ARCHIVE storage if on same filesystem mount (#2583). Contributed by Leon Gao.
7743d40 is described below

commit 7743d40ac5b6fba73204feba22d2256d4e9d70f0
Author: LeonGao <lia...@uber.com>
AuthorDate: Fri Jan 15 16:28:11 2021 -0800

    HDFS-15549. Use Hardlink to move replica between DISK and ARCHIVE storage 
if on same filesystem mount (#2583). Contributed by Leon Gao.
---
 .../main/java/org/apache/hadoop/fs/HardLink.java   |   4 +-
 .../java/org/apache/hadoop/fs/StorageType.java     |   5 +
 .../hdfs/server/datanode/DirectoryScanner.java     |   3 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java     | 136 ++++++++-
 .../datanode/fsdataset/impl/FsVolumeImpl.java      |  23 +-
 .../datanode/fsdataset/impl/FsVolumeList.java      |  29 +-
 .../datanode/fsdataset/impl/TestFsDatasetImpl.java | 317 ++++++++++++++++++++-
 .../apache/hadoop/hdfs/server/mover/TestMover.java |   6 +
 8 files changed, 502 insertions(+), 21 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java
index 30f793d..887ae0c 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java
@@ -153,11 +153,11 @@ public class HardLink {
    */
 
   /**
-   * Creates a hardlink 
+   * Creates a hardlink.
    * @param file - existing source file
    * @param linkName - desired target link file
    */
-  public static void createHardLink(File file, File linkName) 
+  public static void createHardLink(File file, File linkName)
       throws IOException {
     if (file == null) {
       throw new IOException(
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
index e11c129..b17864a 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
@@ -92,6 +92,11 @@ public enum StorageType {
     return StorageType.valueOf(StringUtils.toUpperCase(s));
   }
 
+  public static boolean allowSameDiskTiering(StorageType storageType) {
+    return storageType == StorageType.DISK
+        || storageType == StorageType.ARCHIVE;
+  }
+
   private static List<StorageType> getNonTransientTypes() {
     List<StorageType> nonTransientTypes = new ArrayList<>();
     for (StorageType t : VALUES) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index d835108..66cfa01 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -322,7 +322,8 @@ public class DirectoryScanner implements Runnable {
    * Start the scanner. The scanner will run every
    * {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds.
    */
-  void start() {
+  @VisibleForTesting
+  public void start() {
     shouldRun.set(true);
     long firstScanTime = ThreadLocalRandom.current().nextLong(scanPeriodMsecs);
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index f5bfd92..c3dbf48 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -48,6 +48,7 @@ import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
+import org.apache.hadoop.fs.HardLink;
 import 
org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -994,6 +995,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         smallBufferSize, conf);
   }
 
+  /**
+   * Link the block and meta files for the given block to the given 
destination.
+   * @return the new meta and block files.
+   * @throws IOException
+   */
+  static File[] hardLinkBlockFiles(long blockId, long genStamp,
+      ReplicaInfo srcReplica, File destRoot) throws IOException {
+    final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
+    // blockName is same as the filename for the block
+    final File dstFile = new File(destDir, srcReplica.getBlockName());
+    final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
+    return hardLinkBlockFiles(srcReplica, dstMeta, dstFile);
+  }
+
   static File[] copyBlockFiles(ReplicaInfo srcReplica, File dstMeta,
                                File dstFile, boolean calculateChecksum,
                                int smallBufferSize, final Configuration conf)
@@ -1026,6 +1041,34 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
     return new File[] {dstMeta, dstFile};
   }
 
+  static File[] hardLinkBlockFiles(ReplicaInfo srcReplica, File dstMeta,
+      File dstFile)
+      throws IOException {
+    // Create parent folder if not exists.
+    srcReplica.getFileIoProvider()
+        .mkdirs(srcReplica.getVolume(), dstFile.getParentFile());
+    try {
+      HardLink.createHardLink(
+          new File(srcReplica.getBlockURI()), dstFile);
+    } catch (IOException e) {
+      throw new IOException("Failed to hardLink "
+          + srcReplica + " block file to "
+          + dstFile, e);
+    }
+    try {
+      HardLink.createHardLink(
+          new File(srcReplica.getMetadataURI()), dstMeta);
+    } catch (IOException e) {
+      throw new IOException("Failed to hardLink "
+          + srcReplica + " metadata to "
+          + dstMeta, e);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.info("Linked " + srcReplica.getBlockURI() + " to " + dstFile);
+    }
+    return new File[]{dstMeta, dstFile};
+  }
+
   /**
    * Move block files from one storage to another storage.
    * @return Returns the Old replicaInfo
@@ -1058,12 +1101,30 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
     }
 
     FsVolumeReference volumeRef = null;
+    boolean shouldConsiderSameMountVolume =
+        shouldConsiderSameMountVolume(replicaInfo.getVolume(),
+            targetStorageType, targetStorageId);
+    boolean useVolumeOnSameMount = false;
+
     try (AutoCloseableLock lock = datasetReadLock.acquire()) {
-      volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
-          block.getNumBytes());
+      if (shouldConsiderSameMountVolume) {
+        volumeRef = volumes.getVolumeByMount(targetStorageType,
+            ((FsVolumeImpl) replicaInfo.getVolume()).getMount(),
+            block.getNumBytes());
+        if (volumeRef != null) {
+          useVolumeOnSameMount = true;
+        }
+      }
+      if (!useVolumeOnSameMount) {
+        volumeRef = volumes.getNextVolume(
+            targetStorageType,
+            targetStorageId,
+            block.getNumBytes()
+        );
+      }
     }
     try {
-      moveBlock(block, replicaInfo, volumeRef);
+      moveBlock(block, replicaInfo, volumeRef, useVolumeOnSameMount);
     } finally {
       if (volumeRef != null) {
         volumeRef.close();
@@ -1075,19 +1136,53 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
   }
 
   /**
+   * When configuring DISK/ARCHIVE on same volume,
+   * check if we should find the counterpart on the same disk mount.
+   */
+  @VisibleForTesting
+  boolean shouldConsiderSameMountVolume(FsVolumeSpi fsVolume,
+      StorageType targetStorageType, String targetStorageID) {
+    if (targetStorageID != null && !targetStorageID.isEmpty()) {
+      return false;
+    }
+    if (!(fsVolume instanceof FsVolumeImpl)
+        || ((FsVolumeImpl) fsVolume).getMount().isEmpty()) {
+      return false;
+    }
+    StorageType sourceStorageType = fsVolume.getStorageType();
+    // Source/dest storage types are different
+    if (sourceStorageType == targetStorageType) {
+      return false;
+    }
+    // Source/dest storage types are either DISK or ARCHIVE.
+    return StorageType.allowSameDiskTiering(sourceStorageType)
+        && StorageType.allowSameDiskTiering(targetStorageType);
+  }
+
+  /**
    * Moves a block from a given volume to another.
    *
    * @param block       - Extended Block
    * @param replicaInfo - ReplicaInfo
    * @param volumeRef   - Volume Ref - Closed by caller.
+   * @param moveBlockToLocalMount   - Whether we use shortcut
+   *                               to move block on same mount.
    * @return newReplicaInfo
    * @throws IOException
    */
   @VisibleForTesting
   ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo,
-      FsVolumeReference volumeRef) throws IOException {
-    ReplicaInfo newReplicaInfo = copyReplicaToVolume(block, replicaInfo,
-        volumeRef);
+      FsVolumeReference volumeRef, boolean moveBlockToLocalMount)
+      throws IOException {
+    ReplicaInfo newReplicaInfo;
+    if (moveBlockToLocalMount) {
+      newReplicaInfo = moveReplicaToVolumeOnSameMount(block, replicaInfo,
+          volumeRef);
+    } else {
+      newReplicaInfo = copyReplicaToVolume(block, replicaInfo,
+          volumeRef);
+    }
+
     finalizeNewReplica(newReplicaInfo, block);
     removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId());
     return newReplicaInfo;
@@ -1129,6 +1224,33 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
   }
 
   /**
+   * Shortcut to use hardlink to move blocks on same mount.
+   * This is useful when moving blocks between storage types on same disk 
mount.
+   * Two cases need to be considered carefully:
+   * 1) Datanode restart in the middle should not cause data loss.
+   *    We use hardlink to avoid this.
+   * 2) Finalized blocks can be reopened to append.
+   *    This is already handled by dataset lock and gen stamp.
+   *    See HDFS-12942
+   *
+   * @param block       - Extended Block
+   * @param replicaInfo - ReplicaInfo
+   * @param volumeRef   - Volume Ref - Closed by caller.
+   * @return newReplicaInfo new replica object created in specified volume.
+   * @throws IOException
+   */
+  @VisibleForTesting
+  ReplicaInfo moveReplicaToVolumeOnSameMount(ExtendedBlock block,
+      ReplicaInfo replicaInfo,
+      FsVolumeReference volumeRef) throws IOException {
+    FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
+    // Move files to temp dir first
+    ReplicaInfo newReplicaInfo = targetVolume.hardLinkBlockToTmpLocation(block,
+        replicaInfo);
+    return newReplicaInfo;
+  }
+
+  /**
    * Finalizes newReplica by calling finalizeReplica internally.
    *
    * @param newReplicaInfo - ReplicaInfo
@@ -1177,7 +1299,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     }
 
     try {
-      moveBlock(block, replicaInfo, volumeRef);
+      moveBlock(block, replicaInfo, volumeRef, false);
     } finally {
       if (volumeRef != null) {
         volumeRef.close();
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index ccb76b1..07e14fb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -484,9 +484,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
     // should share the same amount of reserved capacity.
     // When calculating actual non dfs used,
     // exclude DFS used capacity by another volume.
-    if (enableSameDiskTiering &&
-        (storageType == StorageType.DISK
-            || storageType == StorageType.ARCHIVE)) {
+    if (enableSameDiskTiering
+        && StorageType.allowSameDiskTiering(storageType)) {
       StorageType counterpartStorageType = storageType == StorageType.DISK
           ? StorageType.ARCHIVE : StorageType.DISK;
       FsVolumeReference counterpartRef = dataset
@@ -1529,6 +1528,24 @@ public class FsVolumeImpl implements FsVolumeSpi {
     return newReplicaInfo;
   }
 
+  public ReplicaInfo hardLinkBlockToTmpLocation(ExtendedBlock block,
+      ReplicaInfo replicaInfo) throws IOException {
+
+    File[] blockFiles = FsDatasetImpl.hardLinkBlockFiles(block.getBlockId(),
+        block.getGenerationStamp(), replicaInfo,
+        getTmpDir(block.getBlockPoolId()));
+
+    ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY)
+        .setBlockId(replicaInfo.getBlockId())
+        .setGenerationStamp(replicaInfo.getGenerationStamp())
+        .setFsVolume(this)
+        .setDirectoryToUse(blockFiles[0].getParentFile())
+        .setBytesToReserve(0)
+        .build();
+    newReplicaInfo.setNumBytes(blockFiles[1].length());
+    return newReplicaInfo;
+  }
+
   public File[] copyBlockToLazyPersistLocation(String bpId, long blockId,
       long genStamp,
       ReplicaInfo replicaInfo,
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index 2d6593d..38cf399 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -111,6 +111,30 @@ class FsVolumeList {
     }
   }
 
+  /**
+   * Get volume by disk mount to place a block.
+   * This is useful for same disk tiering.
+   *
+   * @param storageType The desired {@link StorageType}
+   * @param mount Disk mount of the volume
+   * @param blockSize Free space needed on the volume
+   * @return
+   * @throws IOException
+   */
+  FsVolumeReference getVolumeByMount(StorageType storageType,
+      String mount, long blockSize) throws IOException {
+    if (!enableSameDiskTiering) {
+      return null;
+    }
+    FsVolumeReference volume = mountVolumeMap
+        .getVolumeRefByMountAndStorageType(mount, storageType);
+    // Check if volume has enough capacity
+    if (volume != null && volume.getVolume().getAvailable() > blockSize) {
+      return volume;
+    }
+    return null;
+  }
+
   /** 
    * Get next volume.
    *
@@ -354,9 +378,8 @@ class FsVolumeList {
    * Check if same disk tiering is applied to the volume.
    */
   private boolean isSameDiskTieringApplied(FsVolumeImpl target) {
-    return enableSameDiskTiering &&
-        (target.getStorageType() == StorageType.DISK
-            || target.getStorageType() == StorageType.ARCHIVE);
+    return enableSameDiskTiering
+        && StorageType.allowSameDiskTiering(target.getStorageType());
   }
 
   /**
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 33a6c4f..80437ee 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -17,9 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.function.Supplier;
+
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
 
 import java.io.OutputStream;
@@ -68,6 +71,7 @@ import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.FakeTimer;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Assert;
@@ -1070,24 +1074,43 @@ public class TestFsDatasetImpl {
     }
   }
 
+  /**
+   * When moving blocks using hardLink or copy
+   * and append happened in the middle,
+   * block movement should fail and hardlink is removed.
+   */
   @Test(timeout = 30000)
   public void testMoveBlockFailure() {
+    // Test copy
+    testMoveBlockFailure(conf);
+    // Test hardlink
+    conf.setBoolean(DFSConfigKeys
+        .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
+    conf.setDouble(DFSConfigKeys
+        .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
+    testMoveBlockFailure(conf);
+  }
+
+  private void testMoveBlockFailure(Configuration config) {
     MiniDFSCluster cluster = null;
     try {
+
       cluster = new MiniDFSCluster.Builder(conf)
           .numDataNodes(1)
-          .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
+          .storageTypes(
+              new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
           .storagesPerDatanode(2)
           .build();
       FileSystem fs = cluster.getFileSystem();
       DataNode dataNode = cluster.getDataNodes().get(0);
 
       Path filePath = new Path("testData");
-      DFSTestUtil.createFile(fs, filePath, 100, (short) 1, 0);
-      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
+      long fileLen = 100;
+      ExtendedBlock block = createTestFile(fs, fileLen, filePath);
 
       FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
-      ReplicaInfo newReplicaInfo = createNewReplicaObj(block, fsDataSetImpl);
+      ReplicaInfo newReplicaInfo =
+          createNewReplicaObjWithLink(block, fsDataSetImpl);
 
       // Append to file to update its GS
       FSDataOutputStream out = fs.append(filePath, (short) 1);
@@ -1095,6 +1118,7 @@ public class TestFsDatasetImpl {
       out.hflush();
 
       // Call finalizeNewReplica
+      assertTrue(newReplicaInfo.blockDataExists());
       LOG.info("GenerationStamp of old replica: {}",
           block.getGenerationStamp());
       LOG.info("GenerationStamp of new replica: {}", fsDataSetImpl
@@ -1103,6 +1127,9 @@ public class TestFsDatasetImpl {
       LambdaTestUtils.intercept(IOException.class, "Generation Stamp "
               + "should be monotonically increased.",
           () -> fsDataSetImpl.finalizeNewReplica(newReplicaInfo, block));
+      assertFalse(newReplicaInfo.blockDataExists());
+
+      validateFileLen(fs, fileLen, filePath);
     } catch (Exception ex) {
       LOG.info("Exception in testMoveBlockFailure ", ex);
       fail("Exception while testing testMoveBlockFailure ");
@@ -1144,6 +1171,253 @@ public class TestFsDatasetImpl {
   }
 
   /**
+   * Make sure datanode restart can clean up un-finalized links,
+   * if the block is not finalized yet.
+   */
+  @Test(timeout = 30000)
+  public void testDnRestartWithHardLinkInTmp() {
+    MiniDFSCluster cluster = null;
+    try {
+      conf.setBoolean(DFSConfigKeys
+          .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
+      conf.setDouble(DFSConfigKeys
+          .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
+      cluster = new MiniDFSCluster.Builder(conf)
+          .numDataNodes(1)
+          .storageTypes(
+              new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
+          .storagesPerDatanode(2)
+          .build();
+      FileSystem fs = cluster.getFileSystem();
+      DataNode dataNode = cluster.getDataNodes().get(0);
+
+      Path filePath = new Path("testData");
+      long fileLen = 100;
+
+      ExtendedBlock block = createTestFile(fs, fileLen, filePath);
+
+      FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
+
+      ReplicaInfo oldReplicaInfo = fsDataSetImpl.getReplicaInfo(block);
+      ReplicaInfo newReplicaInfo =
+          createNewReplicaObjWithLink(block, fsDataSetImpl);
+
+      // Link exists
+      assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI())));
+
+      cluster.restartDataNode(0);
+      cluster.waitDatanodeFullyStarted(cluster.getDataNodes().get(0), 60000);
+      cluster.triggerBlockReports();
+
+      // Un-finalized replica data (hard link) is deleted as they were in /tmp
+      assertFalse(Files.exists(Paths.get(newReplicaInfo.getBlockURI())));
+
+      // Old block is there.
+      assertTrue(Files.exists(Paths.get(oldReplicaInfo.getBlockURI())));
+
+      validateFileLen(fs, fileLen, filePath);
+
+    } catch (Exception ex) {
+      LOG.info("Exception in testDnRestartWithHardLinkInTmp ", ex);
+      fail("Exception while testing testDnRestartWithHardLinkInTmp ");
+    } finally {
+      if (cluster.isClusterUp()) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * If new block is finalized and DN restarted,
+   * DiskScanner should clean up the hardlink correctly.
+   */
+  @Test(timeout = 30000)
+  public void testDnRestartWithHardLink() {
+    MiniDFSCluster cluster = null;
+    try {
+      conf.setBoolean(DFSConfigKeys
+          .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
+      conf.setDouble(DFSConfigKeys
+          .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
+      cluster = new MiniDFSCluster.Builder(conf)
+          .numDataNodes(1)
+          .storageTypes(
+              new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
+          .storagesPerDatanode(2)
+          .build();
+      FileSystem fs = cluster.getFileSystem();
+      DataNode dataNode = cluster.getDataNodes().get(0);
+
+      Path filePath = new Path("testData");
+      long fileLen = 100;
+
+      ExtendedBlock block = createTestFile(fs, fileLen, filePath);
+
+      FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
+
+      final ReplicaInfo oldReplicaInfo = fsDataSetImpl.getReplicaInfo(block);
+
+      fsDataSetImpl.finalizeNewReplica(
+          createNewReplicaObjWithLink(block, fsDataSetImpl), block);
+
+      ReplicaInfo newReplicaInfo = fsDataSetImpl.getReplicaInfo(block);
+
+      cluster.restartDataNode(0);
+      cluster.waitDatanodeFullyStarted(cluster.getDataNodes().get(0), 60000);
+      cluster.triggerBlockReports();
+
+      assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI())));
+      assertTrue(Files.exists(Paths.get(oldReplicaInfo.getBlockURI())));
+
+      DirectoryScanner scanner = new DirectoryScanner(
+          cluster.getDataNodes().get(0).getFSDataset(), conf);
+      scanner.start();
+      scanner.run();
+
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override public Boolean get() {
+          return !Files.exists(Paths.get(oldReplicaInfo.getBlockURI()));
+        }
+      }, 100, 10000);
+      assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI())));
+
+      validateFileLen(fs, fileLen, filePath);
+
+    } catch (Exception ex) {
+      LOG.info("Exception in testDnRestartWithHardLink ", ex);
+      fail("Exception while testing testDnRestartWithHardLink ");
+    } finally {
+      if (cluster.isClusterUp()) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test(timeout = 30000)
+  public void testMoveBlockSuccessWithSameMountMove() {
+    MiniDFSCluster cluster = null;
+    try {
+      conf.setBoolean(DFSConfigKeys
+          .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
+      conf.setDouble(DFSConfigKeys
+          .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
+      cluster = new MiniDFSCluster.Builder(conf)
+          .numDataNodes(1)
+          .storageTypes(
+              new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
+          .storagesPerDatanode(2)
+          .build();
+      FileSystem fs = cluster.getFileSystem();
+      DataNode dataNode = cluster.getDataNodes().get(0);
+      Path filePath = new Path("testData");
+      long fileLen = 100;
+
+      ExtendedBlock block = createTestFile(fs, fileLen, filePath);
+
+      FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
+      assertEquals(StorageType.DISK,
+          fsDataSetImpl.getReplicaInfo(block).getVolume().getStorageType());
+
+      FsDatasetImpl fsDataSetImplSpy =
+          spy((FsDatasetImpl) dataNode.getFSDataset());
+      fsDataSetImplSpy.moveBlockAcrossStorage(
+          block, StorageType.ARCHIVE, null);
+
+      // Make sure it is done thru hardlink
+      verify(fsDataSetImplSpy).moveBlock(any(), any(), any(), eq(true));
+
+      assertEquals(StorageType.ARCHIVE,
+          fsDataSetImpl.getReplicaInfo(block).getVolume().getStorageType());
+      validateFileLen(fs, fileLen, filePath);
+
+    } catch (Exception ex) {
+      LOG.info("Exception in testMoveBlockSuccessWithSameMountMove ", ex);
+      fail("testMoveBlockSuccessWithSameMountMove operation should succeed");
+    } finally {
+      if (cluster.isClusterUp()) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  // Move should fail if the volume on same mount has no space.
+  @Test(timeout = 30000)
+  public void testMoveBlockWithSameMountMoveWithoutSpace() {
+    MiniDFSCluster cluster = null;
+    try {
+      conf.setBoolean(DFSConfigKeys
+          .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
+      conf.setDouble(DFSConfigKeys
+          .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.0);
+      cluster = new MiniDFSCluster.Builder(conf)
+          .numDataNodes(1)
+          .storageTypes(
+              new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
+          .storagesPerDatanode(2)
+          .build();
+      FileSystem fs = cluster.getFileSystem();
+      DataNode dataNode = cluster.getDataNodes().get(0);
+      Path filePath = new Path("testData");
+      long fileLen = 100;
+
+      ExtendedBlock block = createTestFile(fs, fileLen, filePath);
+
+      FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
+      assertEquals(StorageType.DISK,
+          fsDataSetImpl.getReplicaInfo(block).getVolume().getStorageType());
+
+      FsDatasetImpl fsDataSetImplSpy =
+          spy((FsDatasetImpl) dataNode.getFSDataset());
+      fsDataSetImplSpy.moveBlockAcrossStorage(
+          block, StorageType.ARCHIVE, null);
+
+      fail("testMoveBlockWithSameMountMoveWithoutSpace operation" +
+          " should failed");
+    } catch (Exception ex) {
+      assertTrue(ex instanceof DiskChecker.DiskOutOfSpaceException);
+    } finally {
+      if (cluster.isClusterUp()) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  // More tests on shouldConsiderSameMountVolume.
+  @Test(timeout = 10000)
+  public void testShouldConsiderSameMountVolume() throws IOException {
+    FsVolumeImpl volume = new FsVolumeImplBuilder()
+        .setConf(conf)
+        .setDataset(dataset)
+        .setStorageID("storage-id")
+        .setStorageDirectory(
+            new StorageDirectory(StorageLocation.parse(BASE_DIR)))
+        .build();
+    assertFalse(dataset.shouldConsiderSameMountVolume(volume,
+        StorageType.ARCHIVE, null));
+
+    conf.setBoolean(DFSConfigKeys
+            .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
+    conf.setDouble(DFSConfigKeys
+            .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE,
+        0.5);
+    volume = new FsVolumeImplBuilder()
+        .setConf(conf)
+        .setDataset(dataset)
+        .setStorageID("storage-id")
+        .setStorageDirectory(
+            new StorageDirectory(StorageLocation.parse(BASE_DIR)))
+        .build();
+    assertTrue(dataset.shouldConsiderSameMountVolume(volume,
+        StorageType.ARCHIVE, null));
+    assertTrue(dataset.shouldConsiderSameMountVolume(volume,
+        StorageType.ARCHIVE, ""));
+    assertFalse(dataset.shouldConsiderSameMountVolume(volume,
+        StorageType.DISK, null));
+    assertFalse(dataset.shouldConsiderSameMountVolume(volume,
+        StorageType.ARCHIVE, "target"));
+  }
+
+  /**
    * Create a new temporary replica of replicaInfo object in another volume.
    *
    * @param block         - Extended Block
@@ -1159,6 +1433,38 @@ public class TestFsDatasetImpl {
   }
 
   /**
+   * Create a new temporary replica of replicaInfo object in another volume.
+   *
+   * @param block         - Extended Block
+   * @param fsDataSetImpl - FsDatasetImpl reference
+   * @throws IOException
+   */
+  private ReplicaInfo createNewReplicaObjWithLink(ExtendedBlock block,
+      FsDatasetImpl fsDataSetImpl) throws IOException {
+    ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block);
+    FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl);
+    return fsDataSetImpl.moveReplicaToVolumeOnSameMount(block, replicaInfo,
+        destVolume.obtainReference());
+  }
+
+  private ExtendedBlock createTestFile(FileSystem fs,
+      long fileLen, Path filePath) throws IOException {
+    DFSTestUtil.createFile(fs, filePath, fileLen, (short) 1, 0);
+    return DFSTestUtil.getFirstBlock(fs, filePath);
+  }
+
+  private void validateFileLen(FileSystem fs,
+      long fileLen, Path filePath) throws IOException {
+    // Read data file to make sure it is good.
+    InputStream in = fs.open(filePath);
+    int bytesCount = 0;
+    while (in.read() != -1) {
+      bytesCount++;
+    }
+    assertTrue(fileLen <= bytesCount);
+  }
+
+  /**
    * Finds a new destination volume for block.
    *
    * @param block         - Extended Block
@@ -1225,7 +1531,8 @@ public class TestFsDatasetImpl {
       ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block);
       FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl);
       assertNotNull("Destination volume should not be null.", destVolume);
-      fsDataSetImpl.moveBlock(block, replicaInfo, 
destVolume.obtainReference());
+      fsDataSetImpl.moveBlock(block, replicaInfo,
+          destVolume.obtainReference(), false);
       // Trigger block report to update block info in NN
       cluster.triggerBlockReports();
       blkReader.read(buf, 512, 512);
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index 5393b90..481c7cf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -446,6 +446,12 @@ public class TestMover {
     final Configuration conf = new HdfsConfiguration();
     initConf(conf);
     testWithinSameNode(conf);
+    // Test movement with hardlink, when same disk tiering is enabled.
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
+    conf.setDouble(DFSConfigKeys
+        .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
+    testWithinSameNode(conf);
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, false);
   }
 
   private void checkMovePaths(List<Path> actual, Path... expected) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to