HDFS-8967. Create a BlockManagerLock class to represent the lock used in the 
BlockManager. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1ecc3300
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1ecc3300
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1ecc3300

Branch: refs/heads/HDFS-8966
Commit: 1ecc3300552f3f54192fa7e8cbd647cc8b589fe5
Parents: 56e4f62
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Oct 6 23:35:24 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Oct 27 14:46:09 2015 -0700

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    | 124 ++++++++++++-------
 .../blockmanagement/BlockManagerLock.java       |  50 ++++++++
 .../CacheReplicationMonitor.java                |   9 +-
 .../server/blockmanagement/DatanodeManager.java |  10 +-
 .../blockmanagement/DecommissionManager.java    |   4 +-
 .../blockmanagement/HeartbeatManager.java       |   8 +-
 .../hdfs/server/namenode/CacheManager.java      |   4 +-
 .../hdfs/server/namenode/FSNamesystem.java      |   5 +
 .../hadoop/hdfs/server/namenode/Namesystem.java |   4 +
 .../blockmanagement/BlockManagerTestUtil.java   |  15 ++-
 .../blockmanagement/TestBlockManager.java       |  27 ++--
 .../blockmanagement/TestDatanodeManager.java    |   2 +
 .../blockmanagement/TestReplicationPolicy.java  |  22 +++-
 .../datanode/TestDataNodeVolumeFailure.java     |   3 +-
 14 files changed, 200 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecc3300/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 5f55ece..4c96a3b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -94,6 +94,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 
 import static 
org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
 
+import org.apache.hadoop.hdfs.util.RwLock;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -112,7 +113,7 @@ import org.slf4j.LoggerFactory;
  * Keeps information related to the blocks stored in the Hadoop cluster.
  */
 @InterfaceAudience.Private
-public class BlockManager implements BlockStatsMXBean {
+public class BlockManager implements RwLock, BlockStatsMXBean {
 
   public static final Logger LOG = LoggerFactory.getLogger(BlockManager.class);
   public static final Logger blockLog = NameNode.blockStateChangeLog;
@@ -125,6 +126,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   private final Namesystem namesystem;
 
+  private final BlockManagerLock lock;
   private final DatanodeManager datanodeManager;
   private final HeartbeatManager heartbeatManager;
   private final BlockTokenSecretManager blockTokenSecretManager;
@@ -302,6 +304,7 @@ public class BlockManager implements BlockStatsMXBean {
   public BlockManager(final Namesystem namesystem, final Configuration conf)
     throws IOException {
     this.namesystem = namesystem;
+    this.lock = new BlockManagerLock(namesystem);
     datanodeManager = new DatanodeManager(this, namesystem, conf);
     heartbeatManager = datanodeManager.getHeartbeatManager();
 
@@ -518,7 +521,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   /** Dump meta data to out. */
   public void metaSave(PrintWriter out) {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
     final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
     datanodeManager.fetchDatanodes(live, dead, false);
@@ -550,7 +553,7 @@ public class BlockManager implements BlockStatsMXBean {
     // Dump all datanodes
     getDatanodeManager().datanodeDump(out);
   }
-  
+
   /**
    * Dump the metadata for the given block in a human-readable
    * form.
@@ -579,12 +582,12 @@ public class BlockManager implements BlockStatsMXBean {
       out.print(fileName + ": ");
     }
     // l: == live:, d: == decommissioned c: == corrupt e: == excess
-    out.print(block + ((usableReplicas > 0)? "" : " MISSING") + 
+    out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
               " (replicas:" +
               " l: " + numReplicas.liveReplicas() +
               " d: " + numReplicas.decommissionedAndDecommissioning() +
               " c: " + numReplicas.corruptReplicas() +
-              " e: " + numReplicas.excessReplicas() + ") "); 
+              " e: " + numReplicas.excessReplicas() + ") ");
 
     Collection<DatanodeDescriptor> corruptNodes = 
                                   corruptReplicas.getNodes(block);
@@ -955,7 +958,7 @@ public class BlockManager implements BlockStatsMXBean {
       final boolean inSnapshot, FileEncryptionInfo feInfo,
       ErasureCodingPolicy ecPolicy)
       throws IOException {
-    assert namesystem.hasReadLock();
+    assert hasReadLock();
     if (blocks == null) {
       return null;
     } else if (blocks.length == 0) {
@@ -989,6 +992,41 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
+  @Override
+  public void readLock() {
+    lock.readLock().lock();
+  }
+
+  @Override
+  public void readUnlock() {
+    lock.readLock().unlock();
+  }
+
+  @Override
+  public boolean hasReadLock() {
+    return lock.hasReadLock();
+  }
+
+  @Override
+  public boolean hasWriteLock() {
+    return lock.hasWriteLock();
+  }
+
+  @Override
+  public void writeLock() {
+    lock.writeLock().lock();
+  }
+
+  @Override
+  public void writeLockInterruptibly() throws InterruptedException {
+    lock.writeLock().lockInterruptibly();
+  }
+
+  @Override
+  public void writeUnlock() {
+    lock.writeLock().unlock();
+  }
+
   /** @return current access keys. */
   public ExportedBlockKeys getBlockKeys() {
     return isBlockTokenEnabled()? blockTokenSecretManager.exportKeys()
@@ -1104,12 +1142,12 @@ public class BlockManager implements BlockStatsMXBean {
   public BlocksWithLocations getBlocks(DatanodeID datanode, long size
       ) throws IOException {
     namesystem.checkOperation(OperationCategory.READ);
-    namesystem.readLock();
+    readLock();
     try {
       namesystem.checkOperation(OperationCategory.READ);
       return getBlocksWithLocations(datanode, size);  
     } finally {
-      namesystem.readUnlock();
+      readUnlock();
     }
   }
 
@@ -1172,7 +1210,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   /** Remove the blocks associated to the given DatanodeStorageInfo. */
   void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     while(it.hasNext()) {
@@ -1249,7 +1287,7 @@ public class BlockManager implements BlockStatsMXBean {
    */
   public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
       final DatanodeInfo dn, String storageID, String reason) throws 
IOException {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     final Block reportedBlock = blk.getLocalBlock();
     final BlockInfo storedBlock = getStoredBlock(reportedBlock);
     if (storedBlock == null) {
@@ -1427,13 +1465,13 @@ public class BlockManager implements BlockStatsMXBean {
    */
   int computeBlockRecoveryWork(int blocksToProcess) {
     List<List<BlockInfo>> blocksToReplicate = null;
-    namesystem.writeLock();
+    writeLock();
     try {
       // Choose the blocks to be replicated
       blocksToReplicate = neededReplications
           .chooseUnderReplicatedBlocks(blocksToProcess);
     } finally {
-      namesystem.writeUnlock();
+      writeUnlock();
     }
     return computeRecoveryWorkForBlocks(blocksToReplicate);
   }
@@ -1451,7 +1489,7 @@ public class BlockManager implements BlockStatsMXBean {
     List<BlockRecoveryWork> recovWork = new LinkedList<>();
 
     // Step 1: categorize at-risk blocks into replication and EC tasks
-    namesystem.writeLock();
+    writeLock();
     try {
       synchronized (neededReplications) {
         for (int priority = 0; priority < blocksToRecover.size(); priority++) {
@@ -1464,7 +1502,7 @@ public class BlockManager implements BlockStatsMXBean {
         }
       }
     } finally {
-      namesystem.writeUnlock();
+      writeUnlock();
     }
 
     // Step 2: choose target nodes for each recovery task
@@ -1486,7 +1524,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // Step 3: add tasks to the DN
-    namesystem.writeLock();
+    writeLock();
     try {
       for(BlockRecoveryWork rw : recovWork){
         final DatanodeStorageInfo[] targets = rw.getTargets();
@@ -1502,7 +1540,7 @@ public class BlockManager implements BlockStatsMXBean {
         }
       }
     } finally {
-      namesystem.writeUnlock();
+      writeUnlock();
     }
 
     if (blockLog.isInfoEnabled()) {
@@ -1876,7 +1914,7 @@ public class BlockManager implements BlockStatsMXBean {
   private void processPendingReplications() {
     BlockInfo[] timedOutItems = pendingReplications.getTimedOutBlocks();
     if (timedOutItems != null) {
-      namesystem.writeLock();
+      writeLock();
       try {
         for (int i = 0; i < timedOutItems.length; i++) {
           /*
@@ -1894,7 +1932,7 @@ public class BlockManager implements BlockStatsMXBean {
           }
         }
       } finally {
-        namesystem.writeUnlock();
+        writeUnlock();
       }
       /* If we know the target datanodes where the replication timedout,
        * we could invoke decBlocksScheduled() on it. Its ok for now.
@@ -1903,7 +1941,7 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) {
-    assert namesystem.hasReadLock();
+    assert hasReadLock();
     DatanodeDescriptor node = null;
     try {
       node = datanodeManager.getDatanode(nodeReg);
@@ -1964,7 +2002,7 @@ public class BlockManager implements BlockStatsMXBean {
       final DatanodeStorage storage,
       final BlockListAsLongs newReport, BlockReportContext context,
       boolean lastStorageInRpc) throws IOException {
-    namesystem.writeLock();
+    writeLock();
     final long startTime = Time.monotonicNow(); //after acquiring write lock
     final long endTime;
     DatanodeDescriptor node;
@@ -2040,7 +2078,7 @@ public class BlockManager implements BlockStatsMXBean {
       }
     } finally {
       endTime = Time.monotonicNow();
-      namesystem.writeUnlock();
+      writeUnlock();
     }
 
     if (invalidatedBlocks != null) {
@@ -2067,7 +2105,7 @@ public class BlockManager implements BlockStatsMXBean {
     LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
             "longer exists on the DataNode.",
         Long.toHexString(context.getReportId()), zombie.getStorageID());
-    assert(namesystem.hasWriteLock());
+    assert hasWriteLock();
     Iterator<BlockInfo> iter = zombie.getBlockIterator();
     int prevBlocks = zombie.numBlocks();
     while (iter.hasNext()) {
@@ -2101,7 +2139,7 @@ public class BlockManager implements BlockStatsMXBean {
     long startTimeRescanPostponedMisReplicatedBlocks = Time.monotonicNow();
     long startPostponedMisReplicatedBlocksCount =
         getPostponedMisreplicatedBlocksCount();
-    namesystem.writeLock();
+    writeLock();
     try {
       // blocksPerRescan is the configured number of blocks per rescan.
       // Randomly select blocksPerRescan consecutive blocks from the HashSet
@@ -2154,7 +2192,7 @@ public class BlockManager implements BlockStatsMXBean {
         }
       }
     } finally {
-      namesystem.writeUnlock();
+      writeUnlock();
       long endPostponedMisReplicatedBlocksCount =
           getPostponedMisreplicatedBlocksCount();
       LOG.info("Rescan of postponedMisreplicatedBlocks completed in " +
@@ -2216,7 +2254,7 @@ public class BlockManager implements BlockStatsMXBean {
       BlockInfo block,
       long oldGenerationStamp, long oldNumBytes, 
       DatanodeStorageInfo[] newStorages) throws IOException {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     BlockToMarkCorrupt b = null;
     if (block.getGenerationStamp() != oldGenerationStamp) {
       b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp,
@@ -2264,7 +2302,7 @@ public class BlockManager implements BlockStatsMXBean {
       final DatanodeStorageInfo storageInfo,
       final BlockListAsLongs report) throws IOException {
     if (report == null) return;
-    assert (namesystem.hasWriteLock());
+    assert (hasWriteLock());
     assert (storageInfo.getBlockReportCount() == 0);
 
     for (BlockReportReplica iblk : report) {
@@ -2702,7 +2740,7 @@ public class BlockManager implements BlockStatsMXBean {
   private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
       DatanodeStorageInfo storageInfo)
   throws IOException {
-    assert (storedBlock != null && namesystem.hasWriteLock());
+    assert (storedBlock != null && hasWriteLock());
     if (!namesystem.isInStartupSafeMode()
         || isPopulatingReplQueues()) {
       addStoredBlock(storedBlock, reported, storageInfo, null, false);
@@ -2737,7 +2775,7 @@ public class BlockManager implements BlockStatsMXBean {
                                DatanodeDescriptor delNodeHint,
                                boolean logEveryBlock)
   throws IOException {
-    assert block != null && namesystem.hasWriteLock();
+    assert block != null && hasWriteLock();
     BlockInfo storedBlock;
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     if (!block.isComplete()) {
@@ -2899,7 +2937,7 @@ public class BlockManager implements BlockStatsMXBean {
    * over or under replicated. Place it into the respective queue.
    */
   public void processMisReplicatedBlocks() {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     stopReplicationInitializer();
     neededReplications.clear();
     replicationQueuesInitializer = new Daemon() {
@@ -2956,7 +2994,7 @@ public class BlockManager implements BlockStatsMXBean {
 
     while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) {
       int processed = 0;
-      namesystem.writeLockInterruptibly();
+      writeLockInterruptibly();
       try {
         while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
           BlockInfo block = blocksItr.next();
@@ -3010,7 +3048,7 @@ public class BlockManager implements BlockStatsMXBean {
           break;
         }
       } finally {
-        namesystem.writeUnlock();
+        writeUnlock();
         // Make sure it is out of the write lock for sufficiently long time.
         Thread.sleep(sleepDuration);
       }
@@ -3108,7 +3146,7 @@ public class BlockManager implements BlockStatsMXBean {
   private void processOverReplicatedBlock(final BlockInfo block,
       final short replication, final DatanodeDescriptor addedNode,
       DatanodeDescriptor delNodeHint) {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     if (addedNode == delNodeHint) {
       delNodeHint = null;
     }
@@ -3146,7 +3184,7 @@ public class BlockManager implements BlockStatsMXBean {
       BlockInfo storedBlock, short replication,
       DatanodeDescriptor addedNode,
       DatanodeDescriptor delNodeHint) {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     // first form a rack to datanodes map and
     BlockCollection bc = getBlockCollection(storedBlock);
     if (storedBlock.isStriped()) {
@@ -3284,7 +3322,7 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     LightWeightHashSet<BlockInfo> excessBlocks = excessReplicateMap.get(
         dn.getDatanodeUuid());
     if (excessBlocks == null) {
@@ -3315,7 +3353,7 @@ public class BlockManager implements BlockStatsMXBean {
    */
   public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor 
node) {
     blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node);
-    assert (namesystem.hasWriteLock());
+    assert hasWriteLock();
     {
       if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
         blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
@@ -3492,7 +3530,7 @@ public class BlockManager implements BlockStatsMXBean {
    */
   public void processIncrementalBlockReport(final DatanodeID nodeID,
       final StorageReceivedDeletedBlocks srdb) throws IOException {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     int received = 0;
     int deleted = 0;
     int receiving = 0;
@@ -3700,7 +3738,7 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   public void removeBlock(BlockInfo block) {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     // No need to ACK blocks that are being removed entirely
     // from the namespace, since the removal of the associated
     // file already removes them from the block map below.
@@ -3734,7 +3772,7 @@ public class BlockManager implements BlockStatsMXBean {
   /** updates a block in under replication queue */
   private void updateNeededReplications(final BlockInfo block,
       final int curReplicasDelta, int expectedReplicasDelta) {
-    namesystem.writeLock();
+    writeLock();
     try {
       if (!isPopulatingReplQueues()) {
         return;
@@ -3752,7 +3790,7 @@ public class BlockManager implements BlockStatsMXBean {
             repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
       }
     } finally {
-      namesystem.writeUnlock();
+      writeUnlock();
     }
   }
 
@@ -3814,7 +3852,7 @@ public class BlockManager implements BlockStatsMXBean {
   private int invalidateWorkForOneNode(DatanodeInfo dn) {
     final List<Block> toInvalidate;
     
-    namesystem.writeLock();
+    writeLock();
     try {
       // blocks should not be replicated or removed if safe mode is on
       if (namesystem.isInSafeMode()) {
@@ -3838,7 +3876,7 @@ public class BlockManager implements BlockStatsMXBean {
         return 0;
       }
     } finally {
-      namesystem.writeUnlock();
+      writeUnlock();
     }
     blockLog.debug("BLOCK* {}: ask {} to delete {}", 
getClass().getSimpleName(),
         dn, toInvalidate);
@@ -4041,12 +4079,12 @@ public class BlockManager implements BlockStatsMXBean {
     int workFound = this.computeBlockRecoveryWork(blocksToProcess);
 
     // Update counters
-    namesystem.writeLock();
+    writeLock();
     try {
       this.updateState();
       this.scheduledReplicationBlocksCount = workFound;
     } finally {
-      namesystem.writeUnlock();
+      writeUnlock();
     }
     workFound += this.computeInvalidateWork(nodesToProcess);
     return workFound;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecc3300/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerLock.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerLock.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerLock.java
new file mode 100644
index 0000000..18dc201
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerLock.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+class BlockManagerLock implements ReadWriteLock {
+  private final ReentrantReadWriteLock coarseLock;
+
+  BlockManagerLock(Namesystem ns) {
+    this.coarseLock = ns.getLockImplementation();
+  }
+
+  @Override
+  public Lock readLock() {
+    return coarseLock.readLock();
+  }
+
+  @Override
+  public Lock writeLock() {
+    return coarseLock.writeLock();
+  }
+
+  boolean hasReadLock() {
+    return hasWriteLock() || coarseLock.getReadHoldCount() > 0;
+  }
+
+  boolean hasWriteLock() {
+    return coarseLock.isWriteLockedByCurrentThread();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecc3300/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
index 2f81ddf..54bcffd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
@@ -55,7 +55,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-;
 
 /**
  * Scans the namesystem, scheduling blocks to be cached as appropriate.
@@ -218,7 +217,7 @@ public class CacheReplicationMonitor extends Thread 
implements Closeable {
    * after are not atomic.
    */
   public void waitForRescanIfNeeded() {
-    Preconditions.checkArgument(!namesystem.hasWriteLock(),
+    Preconditions.checkArgument(!blockManager.hasWriteLock(),
         "Must not hold the FSN write lock when waiting for a rescan.");
     Preconditions.checkArgument(lock.isHeldByCurrentThread(),
         "Must hold the CRM lock when waiting for a rescan.");
@@ -263,7 +262,7 @@ public class CacheReplicationMonitor extends Thread 
implements Closeable {
    */
   @Override
   public void close() throws IOException {
-    Preconditions.checkArgument(namesystem.hasWriteLock());
+    Preconditions.checkArgument(blockManager.hasWriteLock());
     lock.lock();
     try {
       if (shutdown) return;
@@ -285,7 +284,7 @@ public class CacheReplicationMonitor extends Thread 
implements Closeable {
     scannedDirectives = 0;
     scannedBlocks = 0;
     try {
-      namesystem.writeLock();
+      blockManager.writeLock();
       try {
         lock.lock();
         if (shutdown) {
@@ -302,7 +301,7 @@ public class CacheReplicationMonitor extends Thread 
implements Closeable {
       rescanCachedBlockMap();
       blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
     } finally {
-      namesystem.writeUnlock();
+      blockManager.writeUnlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecc3300/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index e30bc2a..77544c0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -540,7 +540,7 @@ public class DatanodeManager {
    * @param nodeInfo datanode descriptor.
    */
   private void removeDatanode(DatanodeDescriptor nodeInfo) {
-    assert namesystem.hasWriteLock();
+    assert blockManager.hasWriteLock();
     heartbeatManager.removeDatanode(nodeInfo);
     blockManager.removeBlocksAssociatedTo(nodeInfo);
     networktopology.remove(nodeInfo);
@@ -559,7 +559,7 @@ public class DatanodeManager {
    */
   public void removeDatanode(final DatanodeID node
       ) throws UnregisteredNodeException {
-    namesystem.writeLock();
+    blockManager.writeLock();
     try {
       final DatanodeDescriptor descriptor = getDatanode(node);
       if (descriptor != null) {
@@ -569,7 +569,7 @@ public class DatanodeManager {
                                      + node + " does not exist");
       }
     } finally {
-      namesystem.writeUnlock();
+      blockManager.writeUnlock();
     }
   }
 
@@ -993,12 +993,12 @@ public class DatanodeManager {
    */
   public void refreshNodes(final Configuration conf) throws IOException {
     refreshHostsReader(conf);
-    namesystem.writeLock();
+    blockManager.writeLock();
     try {
       refreshDatanodes();
       countSoftwareVersions();
     } finally {
-      namesystem.writeUnlock();
+      blockManager.writeUnlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecc3300/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index 42810350..5ce5a83 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@ -366,12 +366,12 @@ public class DecommissionManager {
       numBlocksChecked = 0;
       numNodesChecked = 0;
       // Check decom progress
-      namesystem.writeLock();
+      blockManager.writeLock();
       try {
         processPendingNodes();
         check();
       } finally {
-        namesystem.writeUnlock();
+        blockManager.writeUnlock();
       }
       if (numBlocksChecked + numNodesChecked > 0) {
         LOG.info("Checked {} blocks and {} nodes this tick", numBlocksChecked,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecc3300/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
index d0369aa..24ad6dc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
@@ -351,20 +351,20 @@ class HeartbeatManager implements DatanodeStatistics {
       }
       if (dead != null) {
         // acquire the fsnamesystem lock, and then remove the dead node.
-        namesystem.writeLock();
+        blockManager.writeLock();
         try {
           dm.removeDeadDatanode(dead);
         } finally {
-          namesystem.writeUnlock();
+          blockManager.writeUnlock();
         }
       }
       if (failedStorage != null) {
         // acquire the fsnamesystem lock, and remove blocks on the storage.
-        namesystem.writeLock();
+        blockManager.writeLock();
         try {
           blockManager.removeBlocksAssociatedTo(failedStorage);
         } finally {
-          namesystem.writeUnlock();
+          blockManager.writeUnlock();
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecc3300/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
index 4fd9ca8..3e2443b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
@@ -928,7 +928,7 @@ public final class CacheManager {
 
   public final void processCacheReport(final DatanodeID datanodeID,
       final List<Long> blockIds) throws IOException {
-    namesystem.writeLock();
+    blockManager.writeLock();
     final long startTime = Time.monotonicNow();
     final long endTime;
     try {
@@ -942,7 +942,7 @@ public final class CacheManager {
       processCacheReportImpl(datanode, blockIds);
     } finally {
       endTime = Time.monotonicNow();
-      namesystem.writeUnlock();
+      blockManager.writeUnlock();
     }
 
     // Log the block report processing stats from Namenode perspective

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecc3300/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 2753270..cb79ded 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -6293,6 +6293,11 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     return haContext;
   }
 
+  @Override
+  public ReentrantReadWriteLock getLockImplementation() {
+    return fsLock.coarseLock;
+  }
+
   @Override  // NameNodeMXBean
   public String getCorruptFiles() {
     List<String> list = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecc3300/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
index b1012c2..89fe678 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.util.RwLock;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.AccessControlException;
 
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 /** Namesystem operations. */
 @InterfaceAudience.Private
 public interface Namesystem extends RwLock, SafeMode {
@@ -67,4 +69,6 @@ public interface Namesystem extends RwLock, SafeMode {
   CacheManager getCacheManager();
 
   HAContext getHAContext();
+
+  ReentrantReadWriteLock getLockImplementation();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecc3300/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index 64d80bd..1ae6c2c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -172,10 +172,10 @@ public class BlockManagerTestUtil {
    * @param dnName the name of the DataNode
    */
   public static void noticeDeadDatanode(NameNode nn, String dnName) {
-    FSNamesystem namesystem = nn.getNamesystem();
-    namesystem.writeLock();
+    final BlockManager bm = nn.getNamesystem().getBlockManager();
+    bm.writeLock();
     try {
-      DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager();
+      DatanodeManager dnm = bm.getDatanodeManager();
       HeartbeatManager hbm = dnm.getHeartbeatManager();
       DatanodeDescriptor[] dnds = hbm.getDatanodes();
       DatanodeDescriptor theDND = null;
@@ -191,7 +191,7 @@ public class BlockManagerTestUtil {
         hbm.heartbeatCheck();
       }
     } finally {
-      namesystem.writeUnlock();
+      bm.writeUnlock();
     }
   }
   
@@ -220,18 +220,17 @@ public class BlockManagerTestUtil {
    * Call heartbeat check function of HeartbeatManager and get
    * under replicated blocks count within write lock to make sure
    * computeDatanodeWork doesn't interfere.
-   * @param namesystem the FSNamesystem
    * @param bm the BlockManager to manipulate
    * @return the number of under replicated blocks
    */
   public static int checkHeartbeatAndGetUnderReplicatedBlocksCount(
-      FSNamesystem namesystem, BlockManager bm) {
-    namesystem.writeLock();
+      BlockManager bm) {
+    bm.writeLock();
     try {
       bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
       return bm.getUnderReplicatedNotMissingBlocks();
     } finally {
-      namesystem.writeUnlock();
+      bm.writeUnlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecc3300/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 16d482e..76b4f14 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
@@ -34,6 +35,7 @@ import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -63,12 +65,12 @@ import org.junit.Assert;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Lists;
+import org.mockito.internal.util.reflection.Whitebox;
 
 public class TestBlockManager {
   private DatanodeStorageInfo[] storages;
@@ -97,11 +99,16 @@ public class TestBlockManager {
     Configuration conf = new HdfsConfiguration();
     conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
              "need to set a dummy value here so it assumes a multi-rack 
cluster");
-    fsn = Mockito.mock(FSNamesystem.class);
-    Mockito.doReturn(true).when(fsn).hasWriteLock();
-    Mockito.doReturn(true).when(fsn).hasReadLock();
-    Mockito.doReturn(true).when(fsn).isRunning();
+    fsn = mock(FSNamesystem.class);
+    doReturn(true).when(fsn).isRunning();
+    Lock lockImpl = mock(Lock.class);
+    BlockManagerLock lock = mock(BlockManagerLock.class);
     bm = new BlockManager(fsn, conf);
+    Whitebox.setInternalState(bm, "lock", lock);
+    doReturn(true).when(lock).hasWriteLock();
+    doReturn(true).when(lock).hasReadLock();
+    doReturn(lockImpl).when(lock).readLock();
+    doReturn(lockImpl).when(lock).writeLock();
     final String[] racks = {
         "/rackA",
         "/rackA",
@@ -438,9 +445,9 @@ public class TestBlockManager {
   
   private BlockInfo addBlockOnNodes(long blockId, List<DatanodeDescriptor> 
nodes) {
     long inodeId = ++mockINodeId;
-    BlockCollection bc = Mockito.mock(BlockCollection.class);
-    Mockito.doReturn(inodeId).when(bc).getId();
-    Mockito.doReturn(bc).when(fsn).getBlockCollection(inodeId);
+    BlockCollection bc = mock(BlockCollection.class);
+    doReturn(inodeId).when(bc).getId();
+    doReturn(bc).when(fsn).getBlockCollection(inodeId);
     BlockInfo blockInfo = blockOnNodes(blockId, nodes);
 
     blockInfo.setReplication((short) 3);
@@ -749,7 +756,7 @@ public class TestBlockManager {
     Block block = new Block(blkId);
     BlockInfo blockInfo =
         new BlockInfoContiguous(block, (short) 3);
-    BlockCollection bc = Mockito.mock(BlockCollection.class);
+    BlockCollection bc = mock(BlockCollection.class);
     long inodeId = ++mockINodeId;
     doReturn(inodeId).when(bc).getId();
     bm.blocksMap.addBlockCollection(blockInfo, bc);
@@ -761,7 +768,7 @@ public class TestBlockManager {
     Block block = new Block(blkId);
     BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3);
     blockInfo.convertToBlockUnderConstruction(UNDER_CONSTRUCTION, null);
-    BlockCollection bc = Mockito.mock(BlockCollection.class);
+    BlockCollection bc = mock(BlockCollection.class);
     long inodeId = ++mockINodeId;
     doReturn(inodeId).when(bc).getId();
     bm.blocksMap.addBlockCollection(blockInfo, bc);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecc3300/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index b55a716..8bad600 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.util.Shell;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
 import static org.hamcrest.core.Is.is;
@@ -70,6 +71,7 @@ public class TestDatanodeManager {
   private static DatanodeManager mockDatanodeManager(
       FSNamesystem fsn, Configuration conf) throws IOException {
     BlockManager bm = Mockito.mock(BlockManager.class);
+    Mockito.doReturn(true).when(bm).hasWriteLock();
     BlockReportLeaseManager blm = new BlockReportLeaseManager(conf);
     Mockito.when(bm.getBlockReportLeaseManager()).thenReturn(blm);
     DatanodeManager dm = new DatanodeManager(bm, fsn, conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecc3300/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index ef73001..4e744b4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
@@ -66,6 +67,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.mockito.internal.util.reflection.Whitebox;
 
 @RunWith(Parameterized.class)
 public class TestReplicationPolicy extends BaseReplicationPolicyTest {
@@ -1190,9 +1192,11 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
   public void testAddStoredBlockDoesNotCauseSkippedReplication()
       throws IOException {
     Namesystem mockNS = mock(Namesystem.class);
-    when(mockNS.hasWriteLock()).thenReturn(true);
-    when(mockNS.hasReadLock()).thenReturn(true);
     BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
+    BlockManagerLock lock = mock(BlockManagerLock.class);
+    when(lock.hasWriteLock()).thenReturn(true);
+    when(lock.hasReadLock()).thenReturn(true);
+    Whitebox.setInternalState(bm, "lock", lock);
     UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
 
     BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
@@ -1239,9 +1243,12 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
       testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication()
           throws IOException {
     Namesystem mockNS = mock(Namesystem.class);
-    when(mockNS.hasReadLock()).thenReturn(true);
-
     BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
+    BlockManagerLock lock = mock(BlockManagerLock.class);
+    Lock impl = mock(Lock.class);
+    when(lock.hasReadLock()).thenReturn(true);
+    when(lock.writeLock()).thenReturn(impl);
+    Whitebox.setInternalState(bm, "lock", lock);
     UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
 
     long blkID1 = ThreadLocalRandom.current().nextLong();
@@ -1311,9 +1318,12 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
   public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
       throws IOException {
     Namesystem mockNS = mock(Namesystem.class);
-    when(mockNS.hasReadLock()).thenReturn(true);
-
     BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
+    BlockManagerLock lock = mock(BlockManagerLock.class);
+    Lock impl = mock(Lock.class);
+    when(lock.hasReadLock()).thenReturn(true);
+    when(lock.writeLock()).thenReturn(impl);
+    Whitebox.setInternalState(bm, "lock", lock);
     UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
 
     BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecc3300/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index 2c4fcc5..98519c3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -393,8 +393,7 @@ public class TestDataNodeVolumeFailure {
 
     // underReplicatedBlocks are due to failed volumes
     int underReplicatedBlocks =
-        BlockManagerTestUtil.checkHeartbeatAndGetUnderReplicatedBlocksCount(
-            cluster.getNamesystem(), bm);
+        
BlockManagerTestUtil.checkHeartbeatAndGetUnderReplicatedBlocksCount(bm);
     assertTrue("There is no under replicated block after volume failure",
         underReplicatedBlocks > 0);
   }

Reply via email to