This is an automated email from the ASF dual-hosted git repository. kihwal pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 7a3085d HDFS-15622. Deleted blocks linger in the replications queue. Contributed by Ahmed Hussein. 7a3085d is described below commit 7a3085d552c4b24cdf23da201a300928ada7b8fd Author: Kihwal Lee <kih...@apache.org> AuthorDate: Thu Oct 22 21:01:09 2020 -0500 HDFS-15622. Deleted blocks linger in the replications queue. Contributed by Ahmed Hussein. (cherry picked from commit da1b6e3cc286db00b385f3280627d2b2063b4e59) --- .../blockmanagement/LowRedundancyBlocks.java | 30 ++++++++++---- .../TestLowRedundancyBlockQueues.java | 47 +++++++++++++++++++++- .../blockmanagement/TestReplicationPolicy.java | 17 ++++++-- 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java index f6ef248..d719e93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -500,6 +501,8 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> { * the block count is met or iteration reaches the end of the lowest priority * list, in which case bookmarks for each block list are reset to the heads * of their respective lists. + * If a block is deleted (has invalid bcId), it will be removed from the low + * redundancy queues. * * @param blocksToProcess - number of blocks to fetch from low redundancy * blocks. @@ -515,21 +518,32 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> { int count = 0; int priority = 0; + HashSet<BlockInfo> toRemove = new HashSet<>(); for (; count < blocksToProcess && priority < LEVEL; priority++) { - if (priority == QUEUE_WITH_CORRUPT_BLOCKS) { - // do not choose corrupted blocks. - continue; - } - // Go through all blocks that need reconstructions with current priority. // Set the iterator to the first unprocessed block at this priority level + // We do not want to skip QUEUE_WITH_CORRUPT_BLOCKS because we still need + // to look for deleted blocks if any. + final boolean inCorruptLevel = (QUEUE_WITH_CORRUPT_BLOCKS == priority); final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark(); final List<BlockInfo> blocks = new LinkedList<>(); - blocksToReconstruct.add(blocks); - // Loop through all remaining blocks in the list. + if (!inCorruptLevel) { + blocksToReconstruct.add(blocks); + } for(; count < blocksToProcess && i.hasNext(); count++) { - blocks.add(i.next()); + BlockInfo block = i.next(); + if (block.isDeleted()) { + toRemove.add(block); + continue; + } + if (!inCorruptLevel) { + blocks.add(block); + } + } + for (BlockInfo bInfo : toRemove) { + remove(bInfo, priority); } + toRemove.clear(); } if (priority == LEVEL || resetIterators) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java index e63a8d8..ef614fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.protocol.Block; @@ -41,6 +42,7 @@ import static org.junit.Assert.fail; public class TestLowRedundancyBlockQueues { private final ErasureCodingPolicy ecPolicy; + private static AtomicLong mockINodeId = new AtomicLong(0); public TestLowRedundancyBlockQueues(ErasureCodingPolicy policy) { ecPolicy = policy; @@ -52,7 +54,15 @@ public class TestLowRedundancyBlockQueues { } private BlockInfo genBlockInfo(long id) { - return new BlockInfoContiguous(new Block(id), (short) 3); + return genBlockInfo(id, false); + } + + private BlockInfo genBlockInfo(long id, boolean isCorruptBlock) { + BlockInfo bInfo = new BlockInfoContiguous(new Block(id), (short) 3); + if (!isCorruptBlock) { + bInfo.setBlockCollectionId(mockINodeId.incrementAndGet()); + } + return bInfo; } private BlockInfo genStripedBlockInfo(long id, long numBytes) { @@ -93,6 +103,41 @@ public class TestLowRedundancyBlockQueues { queues.getHighestPriorityECBlockCount()); } + /** + * Tests that deleted blocks should not be returned by + * {@link LowRedundancyBlocks#chooseLowRedundancyBlocks(int, boolean)}. + * @throws Exception + */ + @Test + public void testDeletedBlocks() throws Exception { + int numBlocks = 5; + LowRedundancyBlocks queues = new LowRedundancyBlocks(); + // create 5 blockinfos. The first one is corrupt. + for (int ind = 0; ind < numBlocks; ind++) { + BlockInfo blockInfo = genBlockInfo(ind, ind == 0); + queues.add(blockInfo, 2, 0, 0, 3); + } + List<List<BlockInfo>> blocks; + // Get two blocks from the queue, but we should only get one because first + // block is deleted. + blocks = queues.chooseLowRedundancyBlocks(2, false); + + assertEquals(1, blocks.get(2).size()); + assertEquals(1, blocks.get(2).get(0).getBlockId()); + + // Get the next blocks - should be ID 2 + blocks = queues.chooseLowRedundancyBlocks(1, false); + assertEquals(2, blocks.get(2).get(0).getBlockId()); + + // Get the next block, but also reset this time - should be ID 3 returned + blocks = queues.chooseLowRedundancyBlocks(1, true); + assertEquals(3, blocks.get(2).get(0).getBlockId()); + + // Get one more block and due to resetting the queue it will be block id 1 + blocks = queues.chooseLowRedundancyBlocks(1, false); + assertEquals(1, blocks.get(2).get(0).getBlockId()); + } + @Test public void testQueuePositionCanBeReset() throws Throwable { LowRedundancyBlocks queues = new LowRedundancyBlocks(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 6936e44..efe41723 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.AddBlockFlag; @@ -81,7 +82,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // The interval for marking a datanode as stale, private static final long staleInterval = DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT; - + private static AtomicLong mockINodeId = new AtomicLong(0); @Rule public ExpectedException exception = ExpectedException.none(); @@ -824,7 +825,15 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { } private BlockInfo genBlockInfo(long id) { - return new BlockInfoContiguous(new Block(id), (short) 3); + return genBlockInfo(id, false); + } + + private BlockInfo genBlockInfo(long id, boolean isBlockCorrupted) { + BlockInfo bInfo = new BlockInfoContiguous(new Block(id), (short) 3); + if (!isBlockCorrupted) { + bInfo.setBlockCollectionId(mockINodeId.incrementAndGet()); + } + return bInfo; } /** @@ -847,7 +856,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // Adding the blocks directly to normal priority neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current(). - nextLong()), 2, 0, 0, 3); + nextLong(), true), 2, 0, 0, 3); } // Lets wait for the replication interval, to start process normal // priority blocks @@ -855,7 +864,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // Adding the block directly to high priority list neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current(). - nextLong()), 1, 0, 0, 3); + nextLong(), true), 1, 0, 0, 3); // Lets wait for the replication interval Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org