HDFS-12072. Provide fairness between EC and non-EC recovery tasks. Contributed by Eddy Xu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b2989488 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b2989488 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b2989488 Branch: refs/heads/HDFS-7240 Commit: b29894889742dda654cd88a7ce72a4e51fccb328 Parents: ab1a8ae Author: Andrew Wang <w...@apache.org> Authored: Thu Aug 17 15:26:11 2017 -0700 Committer: Andrew Wang <w...@apache.org> Committed: Thu Aug 17 15:26:11 2017 -0700 ---------------------------------------------------------------------- .../blockmanagement/DatanodeDescriptor.java | 6 +- .../server/blockmanagement/DatanodeManager.java | 45 ++++++--- .../blockmanagement/TestDatanodeManager.java | 96 +++++++++++++++----- 3 files changed, 108 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2989488/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 2bd4a20..d35894c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -661,7 +661,11 @@ public class DatanodeDescriptor extends DatanodeInfo { return erasurecodeBlocks.size(); } - public List<BlockTargetPair> getReplicationCommand(int maxTransfers) { + int getNumberOfReplicateBlocks() { + return replicateBlocks.size(); + } + + List<BlockTargetPair> getReplicationCommand(int maxTransfers) { return replicateBlocks.poll(maxTransfers); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2989488/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 78783ca..c75bcea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1663,21 +1663,38 @@ public class DatanodeManager { } final List<DatanodeCommand> cmds = new ArrayList<>(); - // check pending replication - List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand( - maxTransfers); - if (pendingList != null) { - cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, - pendingList)); - maxTransfers -= pendingList.size(); - } - // check pending erasure coding tasks - List<BlockECReconstructionInfo> pendingECList = nodeinfo - .getErasureCodeCommand(maxTransfers); - if (pendingECList != null) { - cmds.add(new BlockECReconstructionCommand( - DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList)); + // Allocate _approximately_ maxTransfers pending tasks to DataNode. + // NN chooses pending tasks based on the ratio between the lengths of + // replication and erasure-coded block queues. + int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks(); + int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded(); + int totalBlocks = totalReplicateBlocks + totalECBlocks; + if (totalBlocks > 0) { + int numReplicationTasks = (int) Math.ceil( + (double) (totalReplicateBlocks * maxTransfers) / totalBlocks); + int numECTasks = (int) Math.ceil( + (double) (totalECBlocks * maxTransfers) / totalBlocks); + + if (LOG.isDebugEnabled()) { + LOG.debug("Pending replication tasks: " + numReplicationTasks + + " erasure-coded tasks: " + numECTasks); + } + // check pending replication tasks + List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand( + numReplicationTasks); + if (pendingList != null && !pendingList.isEmpty()) { + cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, + pendingList)); + } + // check pending erasure coding tasks + List<BlockECReconstructionInfo> pendingECList = nodeinfo + .getErasureCodeCommand(numECTasks); + if (pendingECList != null && !pendingECList.isEmpty()) { + cmds.add(new BlockECReconstructionCommand( + DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList)); + } } + // check block invalidation Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); if (blks != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2989488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index de002f4..286f4a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -25,6 +25,7 @@ import java.net.URL; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -500,46 +501,93 @@ public class TestDatanodeManager { "127.0.0.1:23456", bothAgain.get(1).getInfoAddr()); } - @Test - public void testPendingRecoveryTasks() throws IOException { + /** + * Verify the correctness of pending recovery process. + * + * @param numReplicationBlocks the number of replication blocks in the queue. + * @param numECBlocks number of EC blocks in the queue. + * @param maxTransfers the maxTransfer value. + * @param numReplicationTasks the number of replication tasks polled from + * the queue. + * @param numECTasks the number of EC tasks polled from the queue. + * + * @throws IOException + */ + private void verifyPendingRecoveryTasks( + int numReplicationBlocks, int numECBlocks, + int maxTransfers, int numReplicationTasks, int numECTasks) + throws IOException { FSNamesystem fsn = Mockito.mock(FSNamesystem.class); Mockito.when(fsn.hasWriteLock()).thenReturn(true); Configuration conf = new Configuration(); DatanodeManager dm = Mockito.spy(mockDatanodeManager(fsn, conf)); - int maxTransfers = 20; - int numPendingTasks = 7; - int numECTasks = maxTransfers - numPendingTasks; - DatanodeDescriptor nodeInfo = Mockito.mock(DatanodeDescriptor.class); Mockito.when(nodeInfo.isRegistered()).thenReturn(true); Mockito.when(nodeInfo.getStorageInfos()) .thenReturn(new DatanodeStorageInfo[0]); - List<BlockTargetPair> pendingList = - Collections.nCopies(numPendingTasks, new BlockTargetPair(null, null)); - Mockito.when(nodeInfo.getReplicationCommand(maxTransfers)) - .thenReturn(pendingList); - List<BlockECReconstructionInfo> ecPendingList = - Collections.nCopies(numECTasks, null); + if (numReplicationBlocks > 0) { + Mockito.when(nodeInfo.getNumberOfReplicateBlocks()) + .thenReturn(numReplicationBlocks); + + List<BlockTargetPair> tasks = + Collections.nCopies( + Math.min(numReplicationTasks, numReplicationBlocks), + new BlockTargetPair(null, null)); + Mockito.when(nodeInfo.getReplicationCommand(numReplicationTasks)) + .thenReturn(tasks); + } + + if (numECBlocks > 0) { + Mockito.when(nodeInfo.getNumberOfBlocksToBeErasureCoded()) + .thenReturn(numECBlocks); + + List<BlockECReconstructionInfo> tasks = + Collections.nCopies(numECTasks, null); + Mockito.when(nodeInfo.getErasureCodeCommand(numECTasks)) + .thenReturn(tasks); + } - Mockito.when(nodeInfo.getErasureCodeCommand(numECTasks)) - .thenReturn(ecPendingList); DatanodeRegistration dnReg = Mockito.mock(DatanodeRegistration.class); Mockito.when(dm.getDatanode(dnReg)).thenReturn(nodeInfo); - DatanodeCommand[] cmds = dm.handleHeartbeat( dnReg, new StorageReport[1], "bp-123", 0, 0, 10, maxTransfers, 0, null, SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); - assertEquals(2, cmds.length); - assertTrue(cmds[0] instanceof BlockCommand); - BlockCommand replicaCmd = (BlockCommand) cmds[0]; - assertEquals(numPendingTasks, replicaCmd.getBlocks().length); - assertEquals(numPendingTasks, replicaCmd.getTargets().length); - assertTrue(cmds[1] instanceof BlockECReconstructionCommand); - BlockECReconstructionCommand ecRecoveryCmd = - (BlockECReconstructionCommand) cmds[1]; - assertEquals(numECTasks, ecRecoveryCmd.getECTasks().size()); + long expectedNumCmds = Arrays.stream( + new int[]{numReplicationTasks, numECTasks}) + .filter(x -> x > 0) + .count(); + assertEquals(expectedNumCmds, cmds.length); + + int idx = 0; + if (numReplicationTasks > 0) { + assertTrue(cmds[idx] instanceof BlockCommand); + BlockCommand cmd = (BlockCommand) cmds[0]; + assertEquals(numReplicationTasks, cmd.getBlocks().length); + assertEquals(numReplicationTasks, cmd.getTargets().length); + idx++; + } + + if (numECTasks > 0) { + assertTrue(cmds[idx] instanceof BlockECReconstructionCommand); + BlockECReconstructionCommand cmd = + (BlockECReconstructionCommand) cmds[idx]; + assertEquals(numECTasks, cmd.getECTasks().size()); + } + + Mockito.verify(nodeInfo).getReplicationCommand(numReplicationTasks); + Mockito.verify(nodeInfo).getErasureCodeCommand(numECTasks); + } + + @Test + public void testPendingRecoveryTasks() throws IOException { + // Tasks are slitted according to the ratio between queue lengths. + verifyPendingRecoveryTasks(20, 20, 20, 10, 10); + verifyPendingRecoveryTasks(40, 10, 20, 16, 4); + + // Approximately load tasks if the ratio between queue length is large. + verifyPendingRecoveryTasks(400, 1, 20, 20, 1); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org