Repository: hadoop Updated Branches: refs/heads/branch-2 991ce3d63 -> e82135df8
Revert this commit as there is compilation issue with this patch in branch-2 "HDFS-9198. Coalesce IBR processing in the NN. (Daryn Sharp via umamahesh)" This reverts commit 991ce3d6300fe742862f07397f5474b1ed8eb9a4. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e82135df Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e82135df Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e82135df Branch: refs/heads/branch-2 Commit: e82135df8752d78050bae72324090229bd7f4d56 Parents: 991ce3d Author: Uma Mahesh <umamah...@apache.org> Authored: Wed Dec 16 19:18:30 2015 -0800 Committer: Uma Mahesh <umamah...@apache.org> Committed: Wed Dec 16 19:18:30 2015 -0800 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 - .../server/blockmanagement/BlockManager.java | 154 +---------------- .../blockmanagement/DatanodeDescriptor.java | 14 +- .../server/blockmanagement/DatanodeManager.java | 2 +- .../blockmanagement/DatanodeStorageInfo.java | 2 +- .../hdfs/server/namenode/CacheManager.java | 2 +- .../hdfs/server/namenode/NameNodeRpcServer.java | 40 +---- .../namenode/metrics/NameNodeMetrics.java | 12 -- .../hadoop/hdfs/TestDatanodeRegistration.java | 140 +-------------- .../blockmanagement/TestBlockManager.java | 173 +------------------ .../blockmanagement/TestPendingReplication.java | 4 +- .../server/datanode/BlockReportTestBase.java | 10 +- .../datanode/TestIncrementalBrVariations.java | 8 +- .../hdfs/server/namenode/TestDeadDatanode.java | 16 +- 14 files changed, 39 insertions(+), 540 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b6a258e..0263f17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1637,8 +1637,6 @@ Release 2.8.0 - UNRELEASED HDFS-9430. Remove waitForLoadingFSImage since checkNNStartup has ensured image loaded and namenode started. (Brahma Reddy Battula via mingma) - HDFS-9198. Coalesce IBR processing in the NN. (Daryn Sharp via umamahesh) - Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 1b331ba..4e36c0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -34,11 +34,6 @@ import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; @@ -96,7 +91,6 @@ import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Daemon; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.Time; @@ -198,10 +192,6 @@ public class BlockManager implements BlockStatsMXBean { /** Replication thread. */ final Daemon replicationThread = new Daemon(new ReplicationMonitor()); - /** Block report thread for handling async reports. */ - private final BlockReportProcessingThread blockReportThread = - new BlockReportProcessingThread(); - /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */ final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap(); @@ -493,7 +483,6 @@ public class BlockManager implements BlockStatsMXBean { datanodeManager.activate(conf); this.replicationThread.setName("ReplicationMonitor"); this.replicationThread.start(); - this.blockReportThread.start(); mxBeanName = MBeans.register("NameNode", "BlockStats", this); bmSafeMode.activate(blockTotal); } @@ -502,9 +491,7 @@ public class BlockManager implements BlockStatsMXBean { bmSafeMode.close(); try { replicationThread.interrupt(); - blockReportThread.interrupt(); replicationThread.join(3000); - blockReportThread.join(3000); } catch (InterruptedException ie) { } datanodeManager.close(); @@ -1890,7 +1877,7 @@ public class BlockManager implements BlockStatsMXBean { try { node = datanodeManager.getDatanode(nodeID); - if (node == null || !node.isRegistered()) { + if (node == null || !node.isAlive()) { throw new IOException( "ProcessReport from dead or unregistered node: " + nodeID); } @@ -3242,23 +3229,17 @@ public class BlockManager implements BlockStatsMXBean { public void processIncrementalBlockReport(final DatanodeID nodeID, final StorageReceivedDeletedBlocks srdb) throws IOException { assert namesystem.hasWriteLock(); + int received = 0; + int deleted = 0; + int receiving = 0; final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); - if (node == null || !node.isRegistered()) { + if (node == null || !node.isAlive()) { blockLog.warn("BLOCK* processIncrementalBlockReport" + " is received from dead or unregistered node {}", nodeID); throw new IOException( "Got incremental block report from unregistered or dead node"); } - try { - processIncrementalBlockReport(node, srdb); - } catch (Exception ex) { - node.setForceRegistration(true); - throw ex; - } - } - private void processIncrementalBlockReport(final DatanodeDescriptor node, - final StorageReceivedDeletedBlocks srdb) throws IOException { DatanodeStorageInfo storageInfo = node.getStorageInfo(srdb.getStorage().getStorageID()); if (storageInfo == null) { @@ -3270,10 +3251,6 @@ public class BlockManager implements BlockStatsMXBean { storageInfo = node.updateStorage(srdb.getStorage()); } - int received = 0; - int deleted = 0; - int receiving = 0; - for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) { switch (rdbi.getStatus()) { case DELETED_BLOCK: @@ -3291,17 +3268,17 @@ public class BlockManager implements BlockStatsMXBean { break; default: String msg = - "Unknown block status code reported by " + node + + "Unknown block status code reported by " + nodeID + ": " + rdbi; blockLog.warn(msg); assert false : msg; // if assertions are enabled, throw. break; } blockLog.debug("BLOCK* block {}: {} is received from {}", - rdbi.getStatus(), rdbi.getBlock(), node); + rdbi.getStatus(), rdbi.getBlock(), nodeID); } blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: from " - + "{} receiving: {}, received: {}, deleted: {}", node, receiving, + + "{} receiving: {}, received: {}, deleted: {}", nodeID, receiving, received, deleted); } @@ -3902,119 +3879,4 @@ public class BlockManager implements BlockStatsMXBean { return false; } - // async processing of an action, used for IBRs. - public void enqueueBlockOp(final Runnable action) throws IOException { - try { - blockReportThread.enqueue(action); - } catch (InterruptedException ie) { - throw new IOException(ie); - } - } - - // sync batch processing for a full BR. - public <T> T runBlockOp(final Callable<T> action) - throws IOException { - final FutureTask<T> future = new FutureTask<T>(action); - enqueueBlockOp(future); - try { - return future.get(); - } catch (ExecutionException ee) { - Throwable cause = ee.getCause(); - if (cause == null) { - cause = ee; - } - if (!(cause instanceof IOException)) { - cause = new IOException(cause); - } - throw (IOException)cause; - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException(ie); - } - } - - @VisibleForTesting - public void flushBlockOps() throws IOException { - runBlockOp(new Callable<Void>(){ - @Override - public Void call() { - return null; - } - }); - } - - public int getBlockOpQueueLength() { - return blockReportThread.queue.size(); - } - - private class BlockReportProcessingThread extends Thread { - private static final long MAX_LOCK_HOLD_MS = 4; - private long lastFull = 0; - - private final BlockingQueue<Runnable> queue = - new ArrayBlockingQueue<Runnable>(1024); - - BlockReportProcessingThread() { - super("Block report processor"); - setDaemon(true); - } - - @Override - public void run() { - try { - processQueue(); - } catch (Throwable t) { - ExitUtil.terminate(1, - getName() + " encountered fatal exception: " + t); - } - } - - private void processQueue() { - while (namesystem.isRunning()) { - NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); - try { - Runnable action = queue.take(); - // batch as many operations in the write lock until the queue - // runs dry, or the max lock hold is reached. - int processed = 0; - namesystem.writeLock(); - metrics.setBlockOpsQueued(queue.size() + 1); - try { - long start = Time.monotonicNow(); - do { - processed++; - action.run(); - if (Time.monotonicNow() - start > MAX_LOCK_HOLD_MS) { - break; - } - action = queue.poll(); - } while (action != null); - } finally { - namesystem.writeUnlock(); - metrics.addBlockOpsBatched(processed - 1); - } - } catch (InterruptedException e) { - // ignore unless thread was specifically interrupted. - if (Thread.interrupted()) { - break; - } - } - } - queue.clear(); - } - - void enqueue(Runnable action) throws InterruptedException { - if (!queue.offer(action)) { - if (!isAlive() && namesystem.isRunning()) { - ExitUtil.terminate(1, getName()+" is not running"); - } - long now = Time.monotonicNow(); - if (now - lastFull > 4000) { - lastFull = now; - LOG.info("Block report queue is full"); - } - queue.put(action); - } - } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 5b9b73e..fbace92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -189,8 +189,7 @@ public class DatanodeDescriptor extends DatanodeInfo { // This is an optimization, because contains takes O(n) time on Arraylist private boolean isAlive = false; private boolean needKeyUpdate = false; - private boolean forceRegistration = false; - + // A system administrator can tune the balancer bandwidth parameter // (dfs.balance.bandwidthPerSec) dynamically by calling // "dfsadmin -setBalanacerBandwidth <newbandwidth>", at which point the @@ -825,7 +824,6 @@ public class DatanodeDescriptor extends DatanodeInfo { storage.setBlockReportCount(0); } heartbeatedSinceRegistration = false; - forceRegistration = false; } /** @@ -908,14 +906,6 @@ public class DatanodeDescriptor extends DatanodeInfo { return false; } return true; - } - - public void setForceRegistration(boolean force) { - forceRegistration = force; - } - - public boolean isRegistered() { - return isAlive() && !forceRegistration; - } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 802bb76..0828d91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1412,7 +1412,7 @@ public class DatanodeManager { throw new DisallowedDatanodeException(nodeinfo); } - if (nodeinfo == null || !nodeinfo.isRegistered()) { + if (nodeinfo == null || !nodeinfo.isAlive()) { return new DatanodeCommand[]{RegisterCommand.REGISTER}; } heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity, http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index d4658e5..216d6d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -151,7 +151,7 @@ public class DatanodeStorageInfo { this.state = s.getState(); } - public int getBlockReportCount() { + int getBlockReportCount() { return blockReportCount; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index b1f936b..4fd9ca8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -934,7 +934,7 @@ public final class CacheManager { try { final DatanodeDescriptor datanode = blockManager.getDatanodeManager().getDatanode(datanodeID); - if (datanode == null || !datanode.isRegistered()) { + if (datanode == null || !datanode.isAlive()) { throw new IOException( "processCacheReport from dead or unregistered datanode: " + datanode); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 7a5e99b..a1fed93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -36,7 +36,6 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.Callable; import com.google.common.collect.Lists; @@ -1359,9 +1358,9 @@ class NameNodeRpcServer implements NamenodeProtocols { } @Override // DatanodeProtocol - public DatanodeCommand blockReport(final DatanodeRegistration nodeReg, - String poolId, final StorageBlockReport[] reports, - final BlockReportContext context) throws IOException { + public DatanodeCommand blockReport(DatanodeRegistration nodeReg, + String poolId, StorageBlockReport[] reports, + BlockReportContext context) throws IOException { checkNNStartup(); verifyRequest(nodeReg); if(blockStateChangeLog.isDebugEnabled()) { @@ -1377,14 +1376,8 @@ class NameNodeRpcServer implements NamenodeProtocols { // for the same node and storage, so the value returned by the last // call of this loop is the final updated value for noStaleStorage. // - final int index = r; - noStaleStorages = bm.runBlockOp(new Callable<Boolean>() { - @Override - public Boolean call() throws IOException { - return bm.processReport(nodeReg, reports[index].getStorage(), - blocks, context, (index == reports.length - 1)); - } - }); + noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(), + blocks, context, (r == reports.length - 1)); metrics.incrStorageBlockReportOps(); } BlockManagerFaultInjector.getInstance(). @@ -1414,9 +1407,8 @@ class NameNodeRpcServer implements NamenodeProtocols { } @Override // DatanodeProtocol - public void blockReceivedAndDeleted(final DatanodeRegistration nodeReg, - String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) - throws IOException { + public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId, + StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException { checkNNStartup(); verifyRequest(nodeReg); metrics.incrBlockReceivedAndDeletedOps(); @@ -1425,22 +1417,8 @@ class NameNodeRpcServer implements NamenodeProtocols { +"from "+nodeReg+" "+receivedAndDeletedBlocks.length +" blocks."); } - final BlockManager bm = namesystem.getBlockManager(); - for (final StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) { - bm.enqueueBlockOp(new Runnable() { - @Override - public void run() { - try { - namesystem.processIncrementalBlockReport(nodeReg, r); - } catch (Exception ex) { - // usually because the node is unregistered/dead. next heartbeat - // will correct the problem - blockStateChangeLog.error( - "*BLOCK* NameNode.blockReceivedAndDeleted: " - + "failed from " + nodeReg + ": " + ex.getMessage()); - } - } - }); + for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) { + namesystem.processIncrementalBlockReport(nodeReg, r); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java index 54b5c6e..31bc164 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java @@ -76,10 +76,6 @@ public class NameNodeMetrics { MutableCounterLong blockReceivedAndDeletedOps; @Metric("Number of blockReports from individual storages") MutableCounterLong storageBlockReportOps; - @Metric("Number of blockReports and blockReceivedAndDeleted queued") - MutableGaugeInt blockOpsQueued; - @Metric("Number of blockReports and blockReceivedAndDeleted batch processed") - MutableCounterLong blockOpsBatched; @Metric("Number of file system operations") public long totalFileOps(){ @@ -271,14 +267,6 @@ public class NameNodeMetrics { storageBlockReportOps.incr(); } - public void setBlockOpsQueued(int size) { - blockOpsQueued.set(size); - } - - public void addBlockOpsBatched(int count) { - blockOpsBatched.incr(count); - } - public void addTransaction(long latency) { transactions.add(latency); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java index ccac99f..ee9fa4b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java @@ -20,32 +20,21 @@ package org.apache.hadoop.hdfs; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.StorageInfo; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.VersionInfo; import org.junit.Test; -import com.google.common.base.Supplier; - import java.net.InetSocketAddress; import java.security.Permission; -import java.util.concurrent.TimeoutException; import static org.junit.Assert.*; import static org.mockito.Mockito.doReturn; @@ -319,131 +308,4 @@ public class TestDatanodeRegistration { } } } - - // IBRs are async operations to free up IPC handlers. This means the IBR - // response will not contain non-IPC level exceptions - which in practice - // should not occur other than dead/unregistered node which will trigger a - // re-registration. If a non-IPC exception does occur, the safety net is - // a forced re-registration on the next heartbeat. - @Test(timeout=10000) - public void testForcedRegistration() throws Exception { - final Configuration conf = new HdfsConfiguration(); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 4); - conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, Integer.MAX_VALUE); - - final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - cluster.getHttpUri(0); - FSNamesystem fsn = cluster.getNamesystem(); - String bpId = fsn.getBlockPoolId(); - - DataNode dn = cluster.getDataNodes().get(0); - DatanodeDescriptor dnd = - NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId()); - DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); - DatanodeStorageInfo storage = dnd.getStorageInfos()[0]; - - // registration should not change after heartbeat. - assertTrue(dnd.isRegistered()); - DatanodeRegistration lastReg = dn.getDNRegistrationForBP(bpId); - waitForHeartbeat(dn, dnd); - assertSame(lastReg, dn.getDNRegistrationForBP(bpId)); - - // force a re-registration on next heartbeat. - dnd.setForceRegistration(true); - assertFalse(dnd.isRegistered()); - waitForHeartbeat(dn, dnd); - assertTrue(dnd.isRegistered()); - DatanodeRegistration newReg = dn.getDNRegistrationForBP(bpId); - assertNotSame(lastReg, newReg); - lastReg = newReg; - - // registration should not change on subsequent heartbeats. - waitForHeartbeat(dn, dnd); - assertTrue(dnd.isRegistered()); - assertSame(lastReg, dn.getDNRegistrationForBP(bpId)); - assertTrue(waitForBlockReport(dn, dnd)); - assertTrue(dnd.isRegistered()); - assertSame(lastReg, dn.getDNRegistrationForBP(bpId)); - - // check that block report is not processed and registration didn't change. - dnd.setForceRegistration(true); - assertFalse(waitForBlockReport(dn, dnd)); - assertFalse(dnd.isRegistered()); - assertSame(lastReg, dn.getDNRegistrationForBP(bpId)); - - // heartbeat should trigger re-registration, and next block report should - // not change registration. - waitForHeartbeat(dn, dnd); - assertTrue(dnd.isRegistered()); - newReg = dn.getDNRegistrationForBP(bpId); - assertNotSame(lastReg, newReg); - lastReg = newReg; - assertTrue(waitForBlockReport(dn, dnd)); - assertTrue(dnd.isRegistered()); - assertSame(lastReg, dn.getDNRegistrationForBP(bpId)); - - // registration doesn't change. - ExtendedBlock eb = new ExtendedBlock(bpId, 1234); - dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID()); - DataNodeTestUtils.triggerDeletionReport(dn); - assertTrue(dnd.isRegistered()); - assertSame(lastReg, dn.getDNRegistrationForBP(bpId)); - - // a failed IBR will effectively unregister the node. - boolean failed = false; - try { - // pass null to cause a failure since there aren't any easy failure - // modes since it shouldn't happen. - fsn.processIncrementalBlockReport(lastReg, null); - } catch (NullPointerException npe) { - failed = true; - } - assertTrue("didn't fail", failed); - assertFalse(dnd.isRegistered()); - - // should remain unregistered until next heartbeat. - dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID()); - DataNodeTestUtils.triggerDeletionReport(dn); - assertFalse(dnd.isRegistered()); - assertSame(lastReg, dn.getDNRegistrationForBP(bpId)); - waitForHeartbeat(dn, dnd); - assertTrue(dnd.isRegistered()); - assertNotSame(lastReg, dn.getDNRegistrationForBP(bpId)); - } - - private void waitForHeartbeat(final DataNode dn, final DatanodeDescriptor dnd) - throws Exception { - final long lastUpdate = dnd.getLastUpdateMonotonic(); - Thread.sleep(1); - DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false); - DataNodeTestUtils.triggerHeartbeat(dn); - GenericTestUtils.waitFor(new Supplier<Boolean>() { - @Override - public Boolean get() { - return lastUpdate != dnd.getLastUpdateMonotonic(); - } - }, 10, 100000); - DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); - } - - private boolean waitForBlockReport(final DataNode dn, - final DatanodeDescriptor dnd) throws Exception { - final DatanodeStorageInfo storage = dnd.getStorageInfos()[0]; - final long lastCount = storage.getBlockReportCount(); - dn.triggerBlockReport( - new BlockReportOptions.Factory().setIncremental(false).build()); - try { - GenericTestUtils.waitFor(new Supplier<Boolean>() { - @Override - public Boolean get() { - return lastCount != storage.getBlockReportCount(); - } - }, 10, 100); - } catch (TimeoutException te) { - return false; - } - return true; - } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 1cebac1..5c874ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION; -import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -35,20 +34,8 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; @@ -73,12 +60,8 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.MetricsAsserts; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -848,158 +831,4 @@ public class TestBlockManager { Assert.assertFalse(policyDefault.useDelHint(delHint, null, moreThan1Racks, null, excessTypes)); } - - @Test - public void testBlockReportQueueing() throws Exception { - Configuration conf = new HdfsConfiguration(); - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); - try { - cluster.waitActive(); - final FSNamesystem fsn = cluster.getNamesystem(); - final BlockManager bm = fsn.getBlockManager(); - final ExecutorService executor = Executors.newCachedThreadPool(); - - final CyclicBarrier startBarrier = new CyclicBarrier(2); - final CountDownLatch endLatch = new CountDownLatch(3); - - // create a task intended to block while processing, thus causing - // the queue to backup. simulates how a full BR is processed. - FutureTask<?> blockingOp = new FutureTask<Void>( - new Callable<Void>(){ - @Override - public Void call() throws IOException { - return bm.runBlockOp(new Callable<Void>() { - @Override - public Void call() - throws InterruptedException, BrokenBarrierException { - // use a barrier to control the blocking. - startBarrier.await(); - endLatch.countDown(); - return null; - } - }); - } - }); - - // create an async task. simulates how an IBR is processed. - Callable<?> asyncOp = new Callable<Void>(){ - @Override - public Void call() throws IOException { - bm.enqueueBlockOp(new Runnable() { - @Override - public void run() { - // use the latch to signal if the op has run. - endLatch.countDown(); - } - }); - return null; - } - }; - - // calling get forces its execution so we can test if it's blocked. - Future<?> blockedFuture = executor.submit(blockingOp); - boolean isBlocked = false; - try { - // wait 1s for the future to block. it should run instantaneously. - blockedFuture.get(1, TimeUnit.SECONDS); - } catch (TimeoutException te) { - isBlocked = true; - } - assertTrue(isBlocked); - - // should effectively return immediately since calls are queued. - // however they should be backed up in the queue behind the blocking - // operation. - executor.submit(asyncOp).get(1, TimeUnit.SECONDS); - executor.submit(asyncOp).get(1, TimeUnit.SECONDS); - - // check the async calls are queued, and first is still blocked. - assertEquals(2, bm.getBlockOpQueueLength()); - assertFalse(blockedFuture.isDone()); - - // unblock the queue, wait for last op to complete, check the blocked - // call has returned - startBarrier.await(1, TimeUnit.SECONDS); - assertTrue(endLatch.await(1, TimeUnit.SECONDS)); - assertEquals(0, bm.getBlockOpQueueLength()); - assertTrue(blockingOp.isDone()); - } finally { - cluster.shutdown(); - } - } - - // spam the block manager with IBRs to verify queuing is occurring. - @Test - public void testAsyncIBR() throws Exception { - Logger.getRootLogger().setLevel(Level.WARN); - - // will create files with many small blocks. - final int blkSize = 4*1024; - final int fileSize = blkSize * 100; - final byte[] buf = new byte[2*blkSize]; - final int numWriters = 4; - final int repl = 3; - - final CyclicBarrier barrier = new CyclicBarrier(numWriters); - final CountDownLatch writeLatch = new CountDownLatch(numWriters); - final AtomicBoolean failure = new AtomicBoolean(); - - final Configuration conf = new HdfsConfiguration(); - conf.getLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, blkSize); - final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(8).build(); - - try { - cluster.waitActive(); - // create multiple writer threads to create a file with many blocks. - // will test that concurrent writing causes IBR batching in the NN - Thread[] writers = new Thread[numWriters]; - for (int i=0; i < writers.length; i++) { - final Path p = new Path("/writer"+i); - writers[i] = new Thread(new Runnable() { - @Override - public void run() { - try { - FileSystem fs = cluster.getFileSystem(); - FSDataOutputStream os = - fs.create(p, true, buf.length, (short)repl, blkSize); - // align writers for maximum chance of IBR batching. - barrier.await(); - int remaining = fileSize; - while (remaining > 0) { - os.write(buf); - remaining -= buf.length; - } - os.close(); - } catch (Exception e) { - e.printStackTrace(); - failure.set(true); - } - // let main thread know we are done. - writeLatch.countDown(); - } - }); - writers[i].start(); - } - - // when and how many IBRs are queued is indeterminate, so just watch - // the metrics and verify something was queued at during execution. - boolean sawQueued = false; - while (!writeLatch.await(10, TimeUnit.MILLISECONDS)) { - assertFalse(failure.get()); - MetricsRecordBuilder rb = getMetrics("NameNodeActivity"); - long queued = MetricsAsserts.getIntGauge("BlockOpsQueued", rb); - sawQueued |= (queued > 0); - } - assertFalse(failure.get()); - assertTrue(sawQueued); - - // verify that batching of the IBRs occurred. - MetricsRecordBuilder rb = getMetrics("NameNodeActivity"); - long batched = MetricsAsserts.getLongCounter("BlockOpsBatched", rb); - assertTrue(batched > 0); - } finally { - cluster.shutdown(); - } - } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java index b5b0cf2..3d399a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java @@ -304,8 +304,7 @@ public class TestPendingReplication { reportDnNum++; } } - // IBRs are async, make sure the NN processes all of them. - cluster.getNamesystem().getBlockManager().flushBlockOps(); + assertEquals(DATANODE_COUNT - 3, blkManager.pendingReplications.getNumReplicas(blocks[0])); @@ -323,7 +322,6 @@ public class TestPendingReplication { } } - cluster.getNamesystem().getBlockManager().flushBlockOps(); assertEquals(DATANODE_COUNT - 3, blkManager.pendingReplications.getNumReplicas(blocks[0])); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java index 0a57005..c4a2d06 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java @@ -113,13 +113,9 @@ public abstract class BlockReportTestBase { @After public void shutDownCluster() throws IOException { - if (fs != null) { - fs.close(); - } - if (cluster != null) { - cluster.shutdownDataNodes(); - cluster.shutdown(); - } + fs.close(); + cluster.shutdownDataNodes(); + cluster.shutdown(); } protected static void resetConfiguration() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java index 0801701..989e216 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java @@ -26,6 +26,7 @@ import static org.junit.Assert.*; import java.io.IOException; import java.net.InetSocketAddress; import java.util.UUID; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -186,9 +187,7 @@ public class TestIncrementalBrVariations { } // Make sure that the deleted block from each storage was picked up - // by the NameNode. IBRs are async, make sure the NN processes - // all of them. - cluster.getNamesystem().getBlockManager().flushBlockOps(); + // by the NameNode. assertThat(cluster.getNamesystem().getMissingBlocksCount(), is((long) reports.length)); } @@ -257,8 +256,7 @@ public class TestIncrementalBrVariations { // Send the report to the NN. cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports); - // IBRs are async, make sure the NN processes all of them. - cluster.getNamesystem().getBlockManager().flushBlockOps(); + // Make sure that the NN has learned of the new storage. DatanodeStorageInfo storageInfo = cluster.getNameNode() .getNamesystem() http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index bfd026c..c5262d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -100,14 +100,14 @@ public class TestDeadDatanode { null) }; StorageReceivedDeletedBlocks[] storageBlocks = { new StorageReceivedDeletedBlocks(reg.getDatanodeUuid(), blocks) }; - - // Ensure blockReceived call from dead datanode is not rejected with - // IOException, since it's async, but the node remains unregistered. - dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks); - BlockManager bm = cluster.getNamesystem().getBlockManager(); - // IBRs are async, make sure the NN processes all of them. - bm.flushBlockOps(); - assertFalse(bm.getDatanodeManager().getDatanode(reg).isRegistered()); + + // Ensure blockReceived call from dead datanode is rejected with IOException + try { + dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks); + fail("Expected IOException is not thrown"); + } catch (IOException ex) { + // Expected + } // Ensure blockReport from dead datanode is rejected with IOException StorageBlockReport[] report = { new StorageBlockReport(