HDFS-12043. Add counters for block re-replication. Contributed by Chen Liang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6a9dc5f4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6a9dc5f4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6a9dc5f4 Branch: refs/heads/trunk Commit: 6a9dc5f44b0c7945e3e9a56248cd4ff80d5c8f0f Parents: a2f0cbd Author: Arpit Agarwal <a...@apache.org> Authored: Fri Jun 30 10:20:12 2017 -0700 Committer: Arpit Agarwal <a...@apache.org> Committed: Fri Jun 30 10:20:12 2017 -0700 ---------------------------------------------------------------------- .../server/blockmanagement/BlockManager.java | 13 ++- .../PendingReconstructionBlocks.java | 8 +- .../namenode/metrics/NameNodeMetrics.java | 18 ++++ .../TestPendingReconstruction.java | 90 +++++++++++++++++++- 4 files changed, 122 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a9dc5f4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index a0c4698..a5ee30b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1851,7 +1851,7 @@ public class BlockManager implements BlockStatsMXBean { (pendingReplicaNum > 0 || isPlacementPolicySatisfied(block)); } - private BlockReconstructionWork scheduleReconstruction(BlockInfo block, + BlockReconstructionWork scheduleReconstruction(BlockInfo block, int priority) { // skip abandoned block or block reopened for append if (block.isDeleted() || !block.isCompleteOrCommitted()) { @@ -1873,6 +1873,7 @@ public class BlockManager implements BlockStatsMXBean { if(srcNodes == null || srcNodes.length == 0) { // block can not be reconstructed from any node LOG.debug("Block {} cannot be reconstructed from any node", block); + NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled(); return null; } @@ -1885,6 +1886,7 @@ public class BlockManager implements BlockStatsMXBean { neededReconstruction.remove(block, priority); blockLog.debug("BLOCK* Removing {} from neededReconstruction as" + " it has enough replicas", block); + NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled(); return null; } @@ -1900,6 +1902,7 @@ public class BlockManager implements BlockStatsMXBean { if (block.isStriped()) { if (pendingNum > 0) { // Wait the previous reconstruction to finish. + NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled(); return null; } @@ -3727,8 +3730,8 @@ public class BlockManager implements BlockStatsMXBean { * The given node is reporting that it received a certain block. */ @VisibleForTesting - void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint) - throws IOException { + public void addBlock(DatanodeStorageInfo storageInfo, Block block, + String delHint) throws IOException { DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); // Decrement number of blocks scheduled to this datanode. // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with @@ -3751,7 +3754,9 @@ public class BlockManager implements BlockStatsMXBean { BlockInfo storedBlock = getStoredBlock(block); if (storedBlock != null && block.getGenerationStamp() == storedBlock.getGenerationStamp()) { - pendingReconstruction.decrement(storedBlock, node); + if (pendingReconstruction.decrement(storedBlock, node)) { + NameNode.getNameNodeMetrics().incSuccessfulReReplications(); + } } processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED, delHintNode); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a9dc5f4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java index 2221d1d..0f20daa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; @@ -97,8 +98,10 @@ class PendingReconstructionBlocks { * for this block. * * @param dn The DataNode that finishes the reconstruction + * @return true if the block is decremented to 0 and got removed. */ - void decrement(BlockInfo block, DatanodeDescriptor dn) { + boolean decrement(BlockInfo block, DatanodeDescriptor dn) { + boolean removed = false; synchronized (pendingReconstructions) { PendingBlockInfo found = pendingReconstructions.get(block); if (found != null) { @@ -106,9 +109,11 @@ class PendingReconstructionBlocks { found.decrementReplicas(dn); if (found.getNumReplicas() <= 0) { pendingReconstructions.remove(block); + removed = true; } } } + return removed; } /** @@ -263,6 +268,7 @@ class PendingReconstructionBlocks { timedOutItems.add(block); } LOG.warn("PendingReconstructionMonitor timed out " + block); + NameNode.getNameNodeMetrics().incTimeoutReReplications(); iter.remove(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a9dc5f4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java index cb81f5a..f2534e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java @@ -58,6 +58,12 @@ public class NameNodeMetrics { @Metric MutableCounterLong createSymlinkOps; @Metric MutableCounterLong getLinkTargetOps; @Metric MutableCounterLong filesInGetListingOps; + @Metric ("Number of successful re-replications") + MutableCounterLong successfulReReplications; + @Metric ("Number of times we failed to schedule a block re-replication.") + MutableCounterLong numTimesReReplicationNotScheduled; + @Metric("Number of timed out block re-replications") + MutableCounterLong timeoutReReplications; @Metric("Number of allowSnapshot operations") MutableCounterLong allowSnapshotOps; @Metric("Number of disallowSnapshot operations") @@ -300,6 +306,18 @@ public class NameNodeMetrics { transactionsBatchedInSync.incr(count); } + public void incSuccessfulReReplications() { + successfulReReplications.incr(); + } + + public void incNumTimesReReplicationNotScheduled() { + numTimesReReplicationNotScheduled.incr(); + } + + public void incTimeoutReReplications() { + timeoutReReplications.incr(); + } + public void addSync(long elapsed) { syncs.add(elapsed); for (MutableQuantiles q : syncsQuantiles) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a9dc5f4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java index 7679f9d..29ee953 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java @@ -17,13 +17,21 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.ArrayList; +import java.util.concurrent.TimeoutException; +import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -44,6 +52,8 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Test; import org.mockito.Mockito; @@ -178,7 +188,7 @@ public class TestPendingReconstruction { public void testProcessPendingReconstructions() throws Exception { final Configuration conf = new HdfsConfiguration(); conf.setLong( - DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, TIMEOUT); + DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, TIMEOUT); MiniDFSCluster cluster = null; Block block; BlockInfo blockInfo; @@ -418,7 +428,7 @@ public class TestPendingReconstruction { CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFS_REPLICATION_INTERVAL); - CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, + CONF.setInt(DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, DFS_REPLICATION_INTERVAL); MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes( DATANODE_COUNT).build(); @@ -471,4 +481,80 @@ public class TestPendingReconstruction { cluster.shutdown(); } } + + /** + * Test the metric counters of the re-replication process. + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test (timeout = 300000) + public void testReplicationCounter() throws IOException, + InterruptedException, TimeoutException { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setInt(DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1); + conf.setInt(DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 1); + MiniDFSCluster tmpCluster = new MiniDFSCluster.Builder(conf).numDataNodes( + DATANODE_COUNT).build(); + tmpCluster.waitActive(); + FSNamesystem fsn = tmpCluster.getNamesystem(0); + fsn.writeLock(); + + try { + BlockManager bm = fsn.getBlockManager(); + BlocksMap blocksMap = bm.blocksMap; + + // create three blockInfo below, blockInfo0 will success, blockInfo1 will + // time out, blockInfo2 will fail the replication. + BlockCollection bc0 = Mockito.mock(BlockCollection.class); + BlockInfo blockInfo0 = new BlockInfoContiguous((short) 3); + blockInfo0.setBlockId(0); + + BlockCollection bc1 = Mockito.mock(BlockCollection.class); + BlockInfo blockInfo1 = new BlockInfoContiguous((short) 3); + blockInfo1.setBlockId(1); + + BlockCollection bc2 = Mockito.mock(BlockCollection.class); + BlockInfo blockInfo2 = new BlockInfoContiguous((short) 3); + blockInfo2.setBlockId(2); + + blocksMap.addBlockCollection(blockInfo0, bc0); + blocksMap.addBlockCollection(blockInfo1, bc1); + blocksMap.addBlockCollection(blockInfo2, bc2); + + PendingReconstructionBlocks pending = bm.pendingReconstruction; + + MetricsRecordBuilder rb = getMetrics("NameNodeActivity"); + assertCounter("SuccessfulReReplications", 0L, rb); + assertCounter("NumTimesReReplicationNotScheduled", 0L, rb); + assertCounter("TimeoutReReplications", 0L, rb); + + // add block0 and block1 to pending queue. + pending.increment(blockInfo0); + pending.increment(blockInfo1); + + // call addBlock on block0 will make it successfully replicated. + // not calling addBlock on block1 will make it timeout later. + DatanodeStorageInfo[] storageInfos = + DFSTestUtil.createDatanodeStorageInfos(1); + bm.addBlock(storageInfos[0], blockInfo0, null); + + // call schedule replication on blockInfo2 will fail the re-replication. + // because there is no source data to replicate from. + bm.scheduleReconstruction(blockInfo2, 0); + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + MetricsRecordBuilder rb = getMetrics("NameNodeActivity"); + return getLongCounter("SuccessfulReReplications", rb) == 1 && + getLongCounter("NumTimesReReplicationNotScheduled", rb) == 1 && + getLongCounter("TimeoutReReplications", rb) == 1; + } + }, 100, 60000); + } finally { + tmpCluster.shutdown(); + fsn.writeUnlock(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org