HDFS-11965: [SPS]: Should give chance to satisfy the low redundant blocks 
before removing the xattr. Contributed by Surendra Singh Lilhore.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/76b47b2b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/76b47b2b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/76b47b2b

Branch: refs/heads/HDFS-10285
Commit: 76b47b2b5915d5243fa3535cfefe926582d3bddb
Parents: 98ad30d
Author: Uma Maheswara Rao G <uma.ganguma...@intel.com>
Authored: Mon Jul 10 18:00:58 2017 -0700
Committer: Rakesh Radhakrishnan <rake...@apache.org>
Committed: Wed Jan 24 11:13:36 2018 +0530

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    |  15 +++
 .../server/namenode/StoragePolicySatisfier.java |  20 +++-
 .../namenode/TestStoragePolicySatisfier.java    | 102 ++++++++++++++++++-
 ...stStoragePolicySatisfierWithStripedFile.java |  90 ++++++++++++++++
 4 files changed, 224 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/76b47b2b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index dd491cd..6dd743a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -4325,6 +4325,21 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
+   * Check file has low redundancy blocks.
+   */
+  public boolean hasLowRedundancyBlocks(BlockCollection bc) {
+    boolean result = false;
+    for (BlockInfo block : bc.getBlocks()) {
+      short expected = getExpectedRedundancyNum(block);
+      final NumberReplicas n = countNodes(block);
+      if (expected > n.liveReplicas()) {
+        result = true;
+      }
+    }
+    return result;
+  }
+
+  /**
    * Check sufficient redundancy of the blocks in the collection. If any block
    * is needed reconstruction, insert it into the reconstruction queue.
    * Otherwise, if the block is more than the expected replication factor,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76b47b2b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 1b2afa3..97cbf1b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -99,7 +99,10 @@ public class StoragePolicySatisfier implements Runnable {
     // Represents that, the analysis skipped due to some conditions.
     // Example conditions are if no blocks really exists in block collection or
     // if analysis is not required on ec files with unsuitable storage policies
-    BLOCKS_TARGET_PAIRING_SKIPPED;
+    BLOCKS_TARGET_PAIRING_SKIPPED,
+    // Represents that, All the reported blocks are satisfied the policy but
+    // some of the blocks are low redundant.
+    FEW_LOW_REDUNDANCY_BLOCKS
   }
 
   public StoragePolicySatisfier(final Namesystem namesystem,
@@ -247,6 +250,14 @@ public class StoragePolicySatisfier implements Runnable {
               case FEW_BLOCKS_TARGETS_PAIRED:
                 this.storageMovementsMonitor.add(blockCollectionID, false);
                 break;
+              case FEW_LOW_REDUNDANCY_BLOCKS:
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Adding trackID " + blockCollectionID
+                      + " back to retry queue as some of the blocks"
+                      + " are low redundant.");
+                }
+                this.storageMovementNeeded.add(blockCollectionID);
+                break;
               // Just clean Xattrs
               case BLOCKS_TARGET_PAIRING_SKIPPED:
               case BLOCKS_ALREADY_SATISFIED:
@@ -347,11 +358,16 @@ public class StoragePolicySatisfier implements Runnable {
         boolean computeStatus = computeBlockMovingInfos(blockMovingInfos,
             blockInfo, expectedStorageTypes, existing, storages);
         if (computeStatus
-            && status != BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED) 
{
+            && status != BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED
+            && !blockManager.hasLowRedundancyBlocks(blockCollection)) {
           status = BlocksMovingAnalysisStatus.ALL_BLOCKS_TARGETS_PAIRED;
         } else {
           status = BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED;
         }
+      } else {
+        if (blockManager.hasLowRedundancyBlocks(blockCollection)) {
+          status = BlocksMovingAnalysisStatus.FEW_LOW_REDUNDANCY_BLOCKS;
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76b47b2b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index f1a4169..7127895 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
 import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 
 import java.io.FileNotFoundException;
@@ -29,6 +30,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.ReconfigurationException;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.StripedFileTestUtil;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
@@ -55,6 +58,7 @@ import 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -582,7 +586,9 @@ public class TestStoragePolicySatisfier {
       Assert.assertTrue("SPS should be running as "
           + "no Mover really running", running);
     } finally {
-      hdfsCluster.shutdown();
+      if (hdfsCluster != null) {
+        hdfsCluster.shutdown();
+      }
     }
   }
 
@@ -983,6 +989,100 @@ public class TestStoragePolicySatisfier {
     }
   }
 
+  /**
+   * Test SPS for low redundant file blocks.
+   * 1. Create cluster with 3 datanode.
+   * 1. Create one file with 3 replica.
+   * 2. Set policy and call satisfyStoragePolicy for file.
+   * 3. Stop NameNode and Datanodes.
+   * 4. Start NameNode with 2 datanode and wait for block movement.
+   * 5. Start third datanode.
+   * 6. Third Datanode replica also should be moved in proper
+   * sorage based on policy.
+   */
+  @Test(timeout = 300000)
+  public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      conf.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
+          "3000");
+      StorageType[][] newtypes = new StorageType[][] {
+          {StorageType.ARCHIVE, StorageType.DISK},
+          {StorageType.ARCHIVE, StorageType.DISK},
+          {StorageType.ARCHIVE, StorageType.DISK}};
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+          .storageTypes(newtypes).build();
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path filePath = new Path("/zeroSizeFile");
+      DFSTestUtil.createFile(fs, filePath, 1024, (short) 3, 0);
+      fs.setStoragePolicy(filePath, "COLD");
+      List<DataNodeProperties> list = new ArrayList<>();
+      list.add(cluster.stopDataNode(0));
+      list.add(cluster.stopDataNode(0));
+      list.add(cluster.stopDataNode(0));
+      cluster.restartNameNodes();
+      cluster.restartDataNode(list.get(0), true);
+      cluster.restartDataNode(list.get(1), true);
+      cluster.waitActive();
+      fs.satisfyStoragePolicy(filePath);
+      Thread.sleep(3000 * 6);
+      cluster.restartDataNode(list.get(2), true);
+      DFSTestUtil.waitExpectedStorageType(filePath.toString(),
+          StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem());
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test SPS for extra redundant file blocks.
+   * 1. Create cluster with 5 datanode.
+   * 2. Create one file with 5 replica.
+   * 3. Set file replication to 3.
+   * 4. Set policy and call satisfyStoragePolicy for file.
+   * 5. Block should be moved successfully.
+   */
+  @Test(timeout = 300000)
+  public void testSPSWhenFileHasExcessRedundancyBlocks() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      conf.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
+          "3000");
+      StorageType[][] newtypes = new StorageType[][] {
+          {StorageType.ARCHIVE, StorageType.DISK},
+          {StorageType.ARCHIVE, StorageType.DISK},
+          {StorageType.ARCHIVE, StorageType.DISK},
+          {StorageType.ARCHIVE, StorageType.DISK},
+          {StorageType.ARCHIVE, StorageType.DISK}};
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5)
+          .storageTypes(newtypes).build();
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path filePath = new Path("/zeroSizeFile");
+      DFSTestUtil.createFile(fs, filePath, 1024, (short) 5, 0);
+      fs.setReplication(filePath, (short) 3);
+      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+          LogFactory.getLog(BlockStorageMovementAttemptedItems.class));
+      fs.setStoragePolicy(filePath, "COLD");
+      fs.satisfyStoragePolicy(filePath);
+      DFSTestUtil.waitExpectedStorageType(filePath.toString(),
+          StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem());
+      assertFalse("Log output does not contain expected log message: ",
+          logs.getOutput().contains("some of the blocks are low redundant"));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
   private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
       throws IOException {
     ArrayList<DataNode> dns = hdfsCluster.getDataNodes();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76b47b2b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
index eb4a6a3..195c9e3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -27,8 +29,10 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.StripedFileTestUtil;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
@@ -293,6 +297,92 @@ public class TestStoragePolicySatisfierWithStripedFile {
   }
 
   /**
+   * Test SPS for low redundant file blocks.
+   * 1. Create cluster with 10 datanode.
+   * 1. Create one striped file with default EC Policy.
+   * 2. Set policy and call satisfyStoragePolicy for file.
+   * 3. Stop NameNode and Datanodes.
+   * 4. Start NameNode with 5 datanode and wait for block movement.
+   * 5. Start remaining 5 datanode.
+   * 6. All replica  should be moved in proper storage based on policy.
+   */
+  @Test(timeout = 300000)
+  public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception {
+    // start 10 datanodes
+    int numOfDatanodes = 10;
+    int storagesPerDatanode = 2;
+    long capacity = 20 * defaultStripeBlockSize;
+    long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
+    for (int i = 0; i < numOfDatanodes; i++) {
+      for (int j = 0; j < storagesPerDatanode; j++) {
+        capacities[i][j] = capacity;
+      }
+    }
+
+    final Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys
+        .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
+        "3000");
+    initConfWithStripe(conf, defaultStripeBlockSize);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numOfDatanodes)
+        .storagesPerDatanode(storagesPerDatanode)
+        .storageTypes(new StorageType[][]{
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE}})
+        .storageCapacities(capacities)
+        .build();
+    try {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path barDir = new Path("/bar");
+      fs.mkdirs(barDir);
+      // set an EC policy on "/bar" directory
+      fs.setErasureCodingPolicy(barDir, null);
+
+      // write file to barDir
+      final Path fooFile = new Path("/bar/foo");
+      long fileLen = cellSize * dataBlocks;
+      DFSTestUtil.createFile(cluster.getFileSystem(), fooFile,
+          fileLen, (short) 3, 0);
+
+      // Move file to ARCHIVE.
+      fs.setStoragePolicy(barDir, "COLD");
+      //Stop DataNodes and restart namenode
+      List<DataNodeProperties> list = new ArrayList<>(numOfDatanodes);
+      for (int i = 0; i < numOfDatanodes; i++) {
+        list.add(cluster.stopDataNode(0));
+      }
+      cluster.restartNameNodes();
+      // Restart half datanodes
+      for (int i = 0; i < numOfDatanodes / 2; i++) {
+        cluster.restartDataNode(list.get(i), true);
+      }
+      cluster.waitActive();
+      fs.satisfyStoragePolicy(fooFile);
+      Thread.sleep(3000 * 6);
+      //Start reaming datanodes
+      for (int i = numOfDatanodes - 1; i > numOfDatanodes / 2; i--) {
+        cluster.restartDataNode(list.get(i), true);
+      }
+      // verify storage types and locations.
+      waitExpectedStorageType(cluster, fooFile.toString(), fileLen,
+          StorageType.ARCHIVE, 9, 9, 60000);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+
+  /**
    * Tests to verify that for the given path, no blocks under the given path
    * will be scheduled for block movement as there are no available datanode
    * with required storage type.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to