Author: arp Date: Mon Dec 16 00:58:40 2013 New Revision: 1551093 URL: http://svn.apache.org/r1551093 Log: HDFS-5406. Send incremental block reports for all storages in a single call.
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1551093&r1=1551092&r2=1551093&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Dec 16 00:58:40 2013 @@ -451,6 +451,9 @@ Trunk (Unreleased) HDFS-5626. dfsadmin -report shows incorrect cache values. (cmccabe) + HDFS-5406. Send incremental block reports for all storages in a + single call. (Arpit Agarwal) + BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS HDFS-4985. Add storage type to the protocol and expose it in block report Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1551093&r1=1551092&r2=1551093&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Mon Dec 16 00:58:40 2013 @@ -22,6 +22,7 @@ import static org.apache.hadoop.util.Tim import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -273,7 +274,8 @@ class BPServiceActor implements Runnable private void reportReceivedDeletedBlocks() throws IOException { // Generate a list of the pending reports for each storage under the lock - Map<String, ReceivedDeletedBlockInfo[]> blockArrays = Maps.newHashMap(); + ArrayList<StorageReceivedDeletedBlocks> reports = + new ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size()); synchronized (pendingIncrementalBRperStorage) { for (Map.Entry<String, PerStoragePendingIncrementalBR> entry : pendingIncrementalBRperStorage.entrySet()) { @@ -286,33 +288,34 @@ class BPServiceActor implements Runnable pendingReceivedRequests = (pendingReceivedRequests > rdbi.length ? (pendingReceivedRequests - rdbi.length) : 0); - blockArrays.put(storageUuid, rdbi); + reports.add(new StorageReceivedDeletedBlocks(storageUuid, rdbi)); } } } + if (reports.size() == 0) { + // Nothing new to report. + return; + } + // Send incremental block reports to the Namenode outside the lock - for (Map.Entry<String, ReceivedDeletedBlockInfo[]> entry : - blockArrays.entrySet()) { - final String storageUuid = entry.getKey(); - final ReceivedDeletedBlockInfo[] rdbi = entry.getValue(); - - StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( - storageUuid, rdbi) }; - boolean success = false; - try { - bpNamenode.blockReceivedAndDeleted(bpRegistration, - bpos.getBlockPoolId(), report); - success = true; - } finally { - if (!success) { - synchronized (pendingIncrementalBRperStorage) { + boolean success = false; + try { + bpNamenode.blockReceivedAndDeleted(bpRegistration, + bpos.getBlockPoolId(), + reports.toArray(new StorageReceivedDeletedBlocks[reports.size()])); + success = true; + } finally { + if (!success) { + synchronized (pendingIncrementalBRperStorage) { + for (StorageReceivedDeletedBlocks report : reports) { // If we didn't succeed in sending the report, put all of the // blocks back onto our queue, but only in the case where we // didn't put something newer in the meantime. PerStoragePendingIncrementalBR perStorageMap = - pendingIncrementalBRperStorage.get(storageUuid); - pendingReceivedRequests += perStorageMap.putMissingBlockInfos(rdbi); + pendingIncrementalBRperStorage.get(report.getStorageID()); + pendingReceivedRequests += + perStorageMap.putMissingBlockInfos(report.getBlocks()); } } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1551093&r1=1551092&r2=1551093&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Mon Dec 16 00:58:40 2013 @@ -1006,6 +1006,7 @@ class NameNodeRpcServer implements Namen public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException { verifyRequest(nodeReg); + metrics.incrBlockReceivedAndDeletedOps(); if(blockStateChangeLog.isDebugEnabled()) { blockStateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: " +"from "+nodeReg+" "+receivedAndDeletedBlocks.length Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=1551093&r1=1551092&r2=1551093&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Mon Dec 16 00:58:40 2013 @@ -71,6 +71,8 @@ public class NameNodeMetrics { MutableCounterLong listSnapshottableDirOps; @Metric("Number of snapshotDiffReport operations") MutableCounterLong snapshotDiffReportOps; + @Metric("Number of blockReceivedAndDeleted calls") + MutableCounterLong blockReceivedAndDeletedOps; @Metric("Journal transactions") MutableRate transactions; @Metric("Journal syncs") MutableRate syncs; @@ -209,6 +211,10 @@ public class NameNodeMetrics { snapshotDiffReportOps.incr(); } + public void incrBlockReceivedAndDeletedOps() { + blockReceivedAndDeletedOps.incr(); + } + public void addTransaction(long latency) { transactions.add(latency); } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1551093&r1=1551092&r2=1551093&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Mon Dec 16 00:58:40 2013 @@ -2143,17 +2143,14 @@ public class MiniDFSCluster { } /** - * Get a storage directory for a datanode. There are two storage directories - * per datanode: + * Get a storage directory for a datanode. * <ol> * <li><base directory>/data/data<2*dnIndex + 1></li> * <li><base directory>/data/data<2*dnIndex + 2></li> * </ol> * * @param dnIndex datanode index (starts from 0) - * @param dirIndex directory index (0 or 1). Index 0 provides access to the - * first storage directory. Index 1 provides access to the second - * storage directory. + * @param dirIndex directory index. * @return Storage directory */ public static File getStorageDir(int dnIndex, int dirIndex) { @@ -2164,7 +2161,7 @@ public class MiniDFSCluster { * Calculate the DN instance-specific path for appending to the base dir * to determine the location of the storage of a DN instance in the mini cluster * @param dnIndex datanode index - * @param dirIndex directory index (0 or 1). + * @param dirIndex directory index. * @return */ private static String getStorageDirPath(int dnIndex, int dirIndex) { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1551093&r1=1551092&r2=1551093&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Mon Dec 16 00:58:40 2013 @@ -71,7 +71,15 @@ import org.mockito.invocation.Invocation /** * This test simulates a variety of situations when blocks are being * intentionally corrupted, unexpectedly modified, and so on before a block - * report is happening + * report is happening. + * + * For each test case it runs two variations: + * #1 - For a given DN, the first variation sends block reports for all + * storages in a single call to the NN. + * #2 - For a given DN, the second variation sends block reports for each + * storage in a separate call. + * + * The behavior should be the same in either variation. */ public class TestBlockReport { public static final Log LOG = LogFactory.getLog(TestBlockReport.class); @@ -158,14 +166,120 @@ public class TestBlockReport { } /** + * Utility routine to send block reports to the NN, either in a single call + * or reporting one storage per call. + * + * @param dnR + * @param poolId + * @param reports + * @param needtoSplit + * @throws IOException + */ + private void sendBlockReports(DatanodeRegistration dnR, String poolId, + StorageBlockReport[] reports, boolean needtoSplit) throws IOException { + if (!needtoSplit) { + LOG.info("Sending combined block reports for " + dnR); + cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + } else { + for (StorageBlockReport report : reports) { + LOG.info("Sending block report for storage " + report.getStorage().getStorageID()); + StorageBlockReport[] singletonReport = { report }; + cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport); + } + } + } + + /** + * Test variations blockReport_01 through blockReport_09 with combined + * and split block reports. + */ + @Test + public void blockReportCombined_01() throws IOException { + blockReport_01(false); + } + + @Test + public void blockReportSplit_01() throws IOException { + blockReport_01(true); + } + + @Test + public void blockReportCombined_02() throws IOException { + blockReport_02(false); + } + + @Test + public void blockReportSplit_02() throws IOException { + blockReport_02(true); + } + + @Test + public void blockReportCombined_03() throws IOException { + blockReport_03(false); + } + + @Test + public void blockReportSplit_03() throws IOException { + blockReport_03(true); + } + + @Test + public void blockReportCombined_04() throws IOException { + blockReport_04(false); + } + + @Test + public void blockReportSplit_04() throws IOException { + blockReport_04(true); + } + + @Test + public void blockReportCombined_06() throws Exception { + blockReport_06(false); + } + + @Test + public void blockReportSplit_06() throws Exception { + blockReport_06(true); + } + + @Test + public void blockReportCombined_07() throws Exception { + blockReport_07(false); + } + + @Test + public void blockReportSplit_07() throws Exception { + blockReport_07(true); + } + + @Test + public void blockReportCombined_08() throws Exception { + blockReport_08(false); + } + + @Test + public void blockReportSplit_08() throws Exception { + blockReport_08(true); + } + + @Test + public void blockReportCombined_09() throws Exception { + blockReport_09(false); + } + + @Test + public void blockReportSplit_09() throws Exception { + blockReport_09(true); + } + /** * Test write a file, verifies and closes it. Then the length of the blocks * are messed up and BlockReport is forced. * The modification of blocks' length has to be ignored * * @throws java.io.IOException on an error */ - @Test - public void blockReport_01() throws IOException { + private void blockReport_01(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); @@ -198,7 +312,7 @@ public class TestBlockReport { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + sendBlockReports(dnR, poolId, reports, splitBlockReports); List<LocatedBlock> blocksAfterReport = DFSTestUtil.getAllBlocks(fs.open(filePath)); @@ -224,8 +338,7 @@ public class TestBlockReport { * * @throws IOException in case of errors */ - @Test - public void blockReport_02() throws IOException { + private void blockReport_02(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); LOG.info("Running test " + METHOD_NAME); @@ -280,7 +393,7 @@ public class TestBlockReport { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn0, poolId, false, false); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + sendBlockReports(dnR, poolId, reports, splitBlockReports); BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem() .getBlockManager()); @@ -301,8 +414,7 @@ public class TestBlockReport { * * @throws IOException in case of an error */ - @Test - public void blockReport_03() throws IOException { + private void blockReport_03(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath); @@ -312,11 +424,7 @@ public class TestBlockReport { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false); - DatanodeCommand dnCmd = - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); - if(LOG.isDebugEnabled()) { - LOG.debug("Got the command: " + dnCmd); - } + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertThat("Wrong number of corrupt blocks", @@ -333,8 +441,7 @@ public class TestBlockReport { * * @throws IOException in case of an error */ - @Test - public void blockReport_04() throws IOException { + private void blockReport_04(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); DFSTestUtil.createFile(fs, filePath, @@ -352,11 +459,7 @@ public class TestBlockReport { DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); - DatanodeCommand dnCmd = - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); - if(LOG.isDebugEnabled()) { - LOG.debug("Got the command: " + dnCmd); - } + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertThat("Wrong number of corrupt blocks", @@ -373,8 +476,7 @@ public class TestBlockReport { * * @throws IOException in case of an error */ - @Test - public void blockReport_06() throws Exception { + private void blockReport_06(boolean splitBlockReports) throws Exception { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; @@ -387,7 +489,7 @@ public class TestBlockReport { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertEquals("Wrong number of PendingReplication Blocks", 0, cluster.getNamesystem().getUnderReplicatedBlocks()); @@ -406,8 +508,7 @@ public class TestBlockReport { * * @throws IOException in case of an error */ - @Test - public void blockReport_07() throws Exception { + private void blockReport_07(boolean splitBlockReports) throws Exception { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; @@ -421,7 +522,7 @@ public class TestBlockReport { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertThat("Wrong number of corrupt blocks", @@ -432,7 +533,7 @@ public class TestBlockReport { cluster.getNamesystem().getPendingReplicationBlocks(), is(0L)); reports = getBlockReports(dn, poolId, true, true); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertThat("Wrong number of corrupt blocks", @@ -458,8 +559,7 @@ public class TestBlockReport { * * @throws IOException in case of an error */ - @Test - public void blockReport_08() throws IOException { + private void blockReport_08(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; @@ -483,8 +583,8 @@ public class TestBlockReport { DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - StorageBlockReport[] report = getBlockReports(dn, poolId, false, false); - cluster.getNameNodeRpc().blockReport(dnR, poolId, report); + StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertEquals("Wrong number of PendingReplication blocks", blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks()); @@ -500,8 +600,7 @@ public class TestBlockReport { // Similar to BlockReport_08 but corrupts GS and len of the TEMPORARY's // replica block. Expect the same behaviour: NN should simply ignore this // block - @Test - public void blockReport_09() throws IOException { + private void blockReport_09(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; @@ -526,8 +625,8 @@ public class TestBlockReport { DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - StorageBlockReport[] report = getBlockReports(dn, poolId, true, true); - cluster.getNameNodeRpc().blockReport(dnR, poolId, report); + StorageBlockReport[] reports = getBlockReports(dn, poolId, true, true); + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertEquals("Wrong number of PendingReplication blocks", 2, cluster.getNamesystem().getPendingReplicationBlocks());