HDFS-9683. DiskBalancer: Add cancelPlan implementation. (Contributed by Anu Engineer)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/98476406 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/98476406 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/98476406 Branch: refs/heads/trunk Commit: 9847640603ace60d169206a40a256f988b314983 Parents: e646c2e Author: Arpit Agarwal <a...@apache.org> Authored: Thu Mar 3 17:00:52 2016 -0800 Committer: Arpit Agarwal <a...@apache.org> Committed: Thu Jun 23 18:18:48 2016 -0700 ---------------------------------------------------------------------- .../hadoop-hdfs/HDFS-1312_CHANGES.txt | 3 + .../hadoop/hdfs/server/datanode/DataNode.java | 7 +- .../hdfs/server/datanode/DiskBalancer.java | 26 +++ .../diskbalancer/DiskBalancerException.java | 3 +- .../diskbalancer/TestDiskBalancerRPC.java | 213 ++++++++++++------- 5 files changed, 173 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/98476406/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt index 07403cf..919d73e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt @@ -30,3 +30,6 @@ HDFS-1312 Change Log HDFS-9681. DiskBalancer: Add QueryPlan implementation. (Anu Engineer via Arpit Agarwal) + HDFS-9683. DiskBalancer: Add cancelPlan implementation. (Anu Engineer via + Arpit Agarwal) + http://git-wip-us.apache.org/repos/asf/hadoop/blob/98476406/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 56585a8..126deb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3337,12 +3337,15 @@ public class DataNode extends ReconfigurableBase this.diskBalancer.submitPlan(planID, planVersion, plan, bandwidth, false); } + /** + * Cancels a running plan. + * @param planID - Hash string that identifies a plan + */ @Override public void cancelDiskBalancePlan(String planID) throws IOException { checkSuperuserPrivilege(); - throw new DiskBalancerException("Not Implemented", - DiskBalancerException.Result.INTERNAL_ERROR); + this.diskBalancer.cancelPlan(planID); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/98476406/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index c01fb4e..81dbb2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -197,6 +197,32 @@ public class DiskBalancer { } /** + * Cancels a running plan. + * @param planID - Hash of the plan to cancel. + * @throws DiskBalancerException + */ + public void cancelPlan(String planID) throws DiskBalancerException { + lock.lock(); + try { + checkDiskBalancerEnabled(); + if ((this.planID == null) || (!this.planID.equals(planID))) { + LOG.error("Disk Balancer - No such plan. Cancel plan failed. PlanID: " + + planID); + throw new DiskBalancerException("No such plan.", + DiskBalancerException.Result.NO_SUCH_PLAN); + } + if (!this.future.isDone()) { + this.blockMover.setExitFlag(); + shutdownExecutor(); + } + } finally { + lock.unlock(); + } + } + + + + /** * Throws if Disk balancer is disabled. * * @throws DiskBalancerException http://git-wip-us.apache.org/repos/asf/hadoop/blob/98476406/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java index a5e1581..00fe53d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java @@ -35,7 +35,8 @@ public class DiskBalancerException extends IOException { PLAN_ALREADY_IN_PROGRESS, INVALID_VOLUME, INVALID_MOVE, - INTERNAL_ERROR + INTERNAL_ERROR, + NO_SUCH_PLAN } private final Result result; http://git-wip-us.apache.org/repos/asf/hadoop/blob/98476406/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java index 974e973..e29b3b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * <p/> + * * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -37,6 +37,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN; import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE; import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_UNDER_PROGRESS; @@ -63,103 +64,163 @@ public class TestDiskBalancerRPC { } @Test - public void testSubmitTestRpc() throws Exception { - final int dnIndex = 0; - cluster.restartDataNode(dnIndex); - cluster.waitActive(); - ClusterConnector nameNodeConnector = - ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); + public void testSubmitPlan() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + String planHash = rpcTestHelper.getPlanHash(); + int planVersion = rpcTestHelper.getPlanVersion(); + NodePlan plan = rpcTestHelper.getPlan(); + dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + } - DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector); - diskBalancerCluster.readClusterInfo(); - Assert.assertEquals(cluster.getDataNodes().size(), - diskBalancerCluster.getNodes().size()); - diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes()); + @Test + public void testSubmitPlanWithInvalidHash() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + String planHash = rpcTestHelper.getPlanHash(); + char hashArray[] = planHash.toCharArray(); + hashArray[0]++; + planHash = String.valueOf(hashArray); + int planVersion = rpcTestHelper.getPlanVersion(); + NodePlan plan = rpcTestHelper.getPlan(); + thrown.expect(DiskBalancerException.class); + dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + } - DataNode dataNode = cluster.getDataNodes().get(dnIndex); - DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID( - dataNode.getDatanodeUuid()); - GreedyPlanner planner = new GreedyPlanner(10.0f, node); - NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort - ()); - planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan); - final int planVersion = 1; // So far we support only one version. + @Test + public void testSubmitPlanWithInvalidVersion() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + String planHash = rpcTestHelper.getPlanHash(); + int planVersion = rpcTestHelper.getPlanVersion(); + planVersion++; + NodePlan plan = rpcTestHelper.getPlan(); + thrown.expect(DiskBalancerException.class); + dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + } - String planHash = DigestUtils.sha512Hex(plan.toJson()); + @Test + public void testSubmitPlanWithInvalidPlan() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + String planHash = rpcTestHelper.getPlanHash(); + int planVersion = rpcTestHelper.getPlanVersion(); + NodePlan plan = rpcTestHelper.getPlan(); + thrown.expect(DiskBalancerException.class); + dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, ""); + } + @Test + public void testCancelPlan() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + String planHash = rpcTestHelper.getPlanHash(); + int planVersion = rpcTestHelper.getPlanVersion(); + NodePlan plan = rpcTestHelper.getPlan(); dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + dataNode.cancelDiskBalancePlan(planHash); } @Test - public void testCancelTestRpc() throws Exception { - final int dnIndex = 0; - cluster.restartDataNode(dnIndex); - cluster.waitActive(); - ClusterConnector nameNodeConnector = - ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); - - DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector); - diskBalancerCluster.readClusterInfo(); - Assert.assertEquals(cluster.getDataNodes().size(), - diskBalancerCluster.getNodes().size()); - diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes()); - DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(0); - GreedyPlanner planner = new GreedyPlanner(10.0f, node); - NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort - ()); - planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan); - - final int planVersion = 0; // So far we support only one version. - DataNode dataNode = cluster.getDataNodes().get(dnIndex); - String planHash = DigestUtils.sha512Hex(plan.toJson()); - - // Since submitDiskBalancerPlan is not implemented yet, it throws an - // Exception, this will be modified with the actual implementation. - try { - dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); - } catch (DiskBalancerException ex) { - // Let us ignore this for time being. - } + public void testCancelNonExistentPlan() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + String planHash = rpcTestHelper.getPlanHash(); + char hashArray[] = planHash.toCharArray(); + hashArray[0]++; + planHash = String.valueOf(hashArray); + NodePlan plan = rpcTestHelper.getPlan(); thrown.expect(DiskBalancerException.class); dataNode.cancelDiskBalancePlan(planHash); } @Test - public void testQueryTestRpc() throws Exception { - final int dnIndex = 0; - cluster.restartDataNode(dnIndex); - cluster.waitActive(); - ClusterConnector nameNodeConnector = - ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); - - DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster - (nameNodeConnector); - diskBalancerCluster.readClusterInfo(); - Assert.assertEquals(cluster.getDataNodes().size(), - diskBalancerCluster.getNodes().size()); - diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes()); - DataNode dataNode = cluster.getDataNodes().get(dnIndex); - DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID( - dataNode.getDatanodeUuid()); - GreedyPlanner planner = new GreedyPlanner(10.0f, node); - NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort - ()); - planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan); - - final int planVersion = 1; // So far we support only one version. - String planHash = DigestUtils.sha512Hex(plan.toJson()); - dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + public void testCancelEmptyPlan() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + String planHash = ""; + NodePlan plan = rpcTestHelper.getPlan(); + thrown.expect(DiskBalancerException.class); + dataNode.cancelDiskBalancePlan(planHash); + } + + + @Test + public void testQueryPlan() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + String planHash = rpcTestHelper.getPlanHash(); + int planVersion = rpcTestHelper.getPlanVersion(); + NodePlan plan = rpcTestHelper.getPlan(); + + dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan(); Assert.assertTrue(status.getResult() == PLAN_UNDER_PROGRESS || status.getResult() == PLAN_DONE); } @Test - public void testgetDiskBalancerSetting() throws Exception { + public void testQueryPlanWithoutSubmit() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + + DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan(); + Assert.assertTrue(status.getResult() == NO_PLAN); + } + + @Test + public void testGetDiskBalancerSetting() throws Exception { final int dnIndex = 0; DataNode dataNode = cluster.getDataNodes().get(dnIndex); thrown.expect(DiskBalancerException.class); dataNode.getDiskBalancerSetting( DiskBalancerConstants.DISKBALANCER_BANDWIDTH); } + + private class RpcTestHelper { + private NodePlan plan; + private int planVersion; + private DataNode dataNode; + private String planHash; + + public NodePlan getPlan() { + return plan; + } + + public int getPlanVersion() { + return planVersion; + } + + public DataNode getDataNode() { + return dataNode; + } + + public String getPlanHash() { + return planHash; + } + + public RpcTestHelper invoke() throws Exception { + final int dnIndex = 0; + cluster.restartDataNode(dnIndex); + cluster.waitActive(); + ClusterConnector nameNodeConnector = + ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); + + DiskBalancerCluster diskBalancerCluster = + new DiskBalancerCluster(nameNodeConnector); + diskBalancerCluster.readClusterInfo(); + Assert.assertEquals(cluster.getDataNodes().size(), + diskBalancerCluster.getNodes().size()); + diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes()); + dataNode = cluster.getDataNodes().get(dnIndex); + DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID( + dataNode.getDatanodeUuid()); + GreedyPlanner planner = new GreedyPlanner(10.0f, node); + plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort()); + planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan); + planVersion = 1; + planHash = DigestUtils.sha512Hex(plan.toJson()); + return this; + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org