HDFS-10598. DiskBalancer does not execute multi-steps plan. Contributed by Lei 
(Eddy) Xu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d84ab8a5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d84ab8a5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d84ab8a5

Branch: refs/heads/HADOOP-12756
Commit: d84ab8a5786c9320e4708b2f0d54f978fee5ba95
Parents: 255ea45
Author: Wei-Chiu Chuang <weic...@apache.org>
Authored: Tue Jul 26 16:01:50 2016 -0700
Committer: Wei-Chiu Chuang <weic...@apache.org>
Committed: Tue Jul 26 16:01:50 2016 -0700

----------------------------------------------------------------------
 .../hdfs/server/datanode/DiskBalancer.java      | 25 +++--
 .../server/diskbalancer/TestDiskBalancer.java   | 99 +++++++++++++++++++-
 2 files changed, 110 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d84ab8a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index c6948f88..cb2c913 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -486,10 +486,13 @@ public class DiskBalancer {
       public void run() {
         Thread.currentThread().setName("DiskBalancerThread");
         LOG.info("Executing Disk balancer plan. Plan ID -  " + planID);
-
-        for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
-            workMap.entrySet()) {
-          blockMover.copyBlocks(entry.getKey(), entry.getValue());
+        try {
+          for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
+              workMap.entrySet()) {
+            blockMover.copyBlocks(entry.getKey(), entry.getValue());
+          }
+        } finally {
+          blockMover.setExitFlag();
         }
       }
     });
@@ -943,8 +946,7 @@ public class DiskBalancer {
               LOG.error("Exceeded the max error count. source {}, dest: {} " +
                       "error count: {}", source.getBasePath(),
                   dest.getBasePath(), item.getErrorCount());
-              this.setExitFlag();
-              continue;
+              break;
             }
 
             // Check for the block tolerance constraint.
@@ -953,17 +955,15 @@ public class DiskBalancer {
                       "blocks.",
                   source.getBasePath(), dest.getBasePath(),
                   item.getBytesCopied(), item.getBlocksCopied());
-              this.setExitFlag();
-              continue;
+              break;
             }
 
             ExtendedBlock block = getNextBlock(poolIters, item);
             // we are not able to find any blocks to copy.
             if (block == null) {
-              this.setExitFlag();
               LOG.error("No source blocks, exiting the copy. Source: {}, " +
                   "dest:{}", source.getBasePath(), dest.getBasePath());
-              continue;
+              break;
             }
 
             // check if someone told us exit, treat this as an interruption
@@ -971,7 +971,7 @@ public class DiskBalancer {
             // for the thread, since both getNextBlock and moveBlocAcrossVolume
             // can take some time.
             if (!shouldRun()) {
-              continue;
+              break;
             }
 
             long timeUsed;
@@ -990,8 +990,7 @@ public class DiskBalancer {
               LOG.error("Destination volume: {} does not have enough space to" 
+
                   " accommodate a block. Block Size: {} Exiting from" +
                   " copyBlocks.", dest.getBasePath(), block.getNumBytes());
-              this.setExitFlag();
-              continue;
+              break;
             }
 
             LOG.debug("Moved block with size {} from  {} to {}",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d84ab8a5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
index 1e10539..f27b931 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
@@ -25,12 +25,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
 import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
@@ -44,9 +46,11 @@ import org.apache.hadoop.util.Time;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -190,7 +194,6 @@ public class TestDiskBalancer {
       assertTrue(plan.getVolumeSetPlans().size() > 0);
       plan.getVolumeSetPlans().get(0).setTolerancePercent(10);
 
-
       // Submit the plan and wait till the execution is done.
       newDN.submitDiskBalancerPlan(planID, 1, planJson, false);
       String jmxString = newDN.getDiskBalancerStatus();
@@ -237,6 +240,100 @@ public class TestDiskBalancer {
     }
   }
 
+  @Test(timeout=60000)
+  public void testBalanceDataBetweenMultiplePairsOfVolumes()
+      throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    final int DEFAULT_BLOCK_SIZE = 2048;
+    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    final int NUM_DATANODES = 1;
+    final long CAP = 512 * 1024;
+    final Path testFile = new Path("/testfile");
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(NUM_DATANODES)
+        .storageCapacities(new long[]{CAP, CAP, CAP, CAP})
+        .storagesPerDatanode(4)
+        .build();
+    try {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      TestBalancer.createFile(cluster, testFile, CAP, (short) 1, 0);
+
+      DFSTestUtil.waitReplication(fs, testFile, (short) 1);
+      DataNode dnNode = cluster.getDataNodes().get(0);
+      // Move data out of two volumes to make them empty.
+      try (FsDatasetSpi.FsVolumeReferences refs =
+               dnNode.getFSDataset().getFsVolumeReferences()) {
+        assertEquals(4, refs.size());
+        for (int i = 0; i < refs.size(); i += 2) {
+          FsVolumeImpl source = (FsVolumeImpl) refs.get(i);
+          FsVolumeImpl dest = (FsVolumeImpl) refs.get(i + 1);
+          assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
+          DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
+              source, dest);
+          assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0);
+        }
+      }
+
+      cluster.restartDataNodes();
+      cluster.waitActive();
+
+      // Start up a disk balancer and read the cluster info.
+      final DataNode dataNode = cluster.getDataNodes().get(0);
+      ClusterConnector nameNodeConnector =
+          ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
+
+      DiskBalancerCluster diskBalancerCluster =
+          new DiskBalancerCluster(nameNodeConnector);
+      diskBalancerCluster.readClusterInfo();
+      List<DiskBalancerDataNode> nodesToProcess = new LinkedList<>();
+      // Rewrite the capacity in the model to show that disks need
+      // re-balancing.
+      setVolumeCapacity(diskBalancerCluster, CAP, "DISK");
+      nodesToProcess.add(diskBalancerCluster.getNodeByUUID(
+          dataNode.getDatanodeUuid()));
+      diskBalancerCluster.setNodesToProcess(nodesToProcess);
+
+      // Compute a plan.
+      List<NodePlan> clusterPlan = diskBalancerCluster.computePlan(10.0f);
+
+      NodePlan plan = clusterPlan.get(0);
+      assertEquals(2, plan.getVolumeSetPlans().size());
+      plan.setNodeUUID(dnNode.getDatanodeUuid());
+      plan.setTimeStamp(Time.now());
+      String planJson = plan.toJson();
+      String planID = DigestUtils.sha512Hex(planJson);
+
+      dataNode.submitDiskBalancerPlan(planID, 1, planJson, false);
+
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          try {
+            return dataNode.queryDiskBalancerPlan().getResult() ==
+                DiskBalancerWorkStatus.Result.PLAN_DONE;
+          } catch (IOException ex) {
+            return false;
+          }
+        }
+      }, 1000, 100000);
+      assertEquals(dataNode.queryDiskBalancerPlan().getResult(),
+          DiskBalancerWorkStatus.Result.PLAN_DONE);
+
+      try (FsDatasetSpi.FsVolumeReferences refs =
+               dataNode.getFSDataset().getFsVolumeReferences()) {
+        for (FsVolumeSpi vol : refs) {
+          assertTrue(DiskBalancerTestUtil.getBlockCount(vol) > 0);
+        }
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   /**
    * Sets alll Disks capacity to size specified.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to