Author: umamahesh Date: Thu Aug 14 04:21:11 2014 New Revision: 1617873 URL: http://svn.apache.org/r1617873 Log: Merge. HDFS-6783. Fix HDFS CacheReplicationMonitor rescan logic. Contributed by Yi Liu.
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1617873&r1=1617872&r2=1617873&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Aug 14 04:21:11 2014 @@ -248,6 +248,8 @@ Release 2.6.0 - UNRELEASED HDFS-6830. BlockInfo.addStorage fails when DN changes the storage for a block replica (Arpit Agarwal) + HDFS-6783. Fix HDFS CacheReplicationMonitor rescan logic. (Yi Liu via umamahesh) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1617873&r1=1617872&r2=1617873&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Thu Aug 14 04:21:11 2014 @@ -104,21 +104,21 @@ public class CacheReplicationMonitor ext private final Condition scanFinished; /** - * Whether there are pending CacheManager operations that necessitate a - * CacheReplicationMonitor rescan. Protected by the CRM lock. + * The number of rescans completed. Used to wait for scans to finish. + * Protected by the CacheReplicationMonitor lock. */ - private boolean needsRescan = true; + private long completedScanCount = 0; /** - * Whether we are currently doing a rescan. Protected by the CRM lock. + * The scan we're currently performing, or -1 if no scan is in progress. + * Protected by the CacheReplicationMonitor lock. */ - private boolean isScanning = false; + private long curScanCount = -1; /** - * The number of rescans completed. Used to wait for scans to finish. - * Protected by the CacheReplicationMonitor lock. + * The number of rescans we need to complete. Protected by the CRM lock. */ - private long scanCount = 0; + private long neededScanCount = 0; /** * True if this monitor should terminate. Protected by the CRM lock. @@ -169,7 +169,7 @@ public class CacheReplicationMonitor ext LOG.info("Shutting down CacheReplicationMonitor"); return; } - if (needsRescan) { + if (completedScanCount < neededScanCount) { LOG.info("Rescanning because of pending operations"); break; } @@ -182,8 +182,6 @@ public class CacheReplicationMonitor ext doRescan.await(delta, TimeUnit.MILLISECONDS); curTimeMs = Time.monotonicNow(); } - isScanning = true; - needsRescan = false; } finally { lock.unlock(); } @@ -194,8 +192,8 @@ public class CacheReplicationMonitor ext // Update synchronization-related variables. lock.lock(); try { - isScanning = false; - scanCount++; + completedScanCount = curScanCount; + curScanCount = -1; scanFinished.signalAll(); } finally { lock.unlock(); @@ -226,16 +224,15 @@ public class CacheReplicationMonitor ext "Must not hold the FSN write lock when waiting for a rescan."); Preconditions.checkArgument(lock.isHeldByCurrentThread(), "Must hold the CRM lock when waiting for a rescan."); - if (!needsRescan) { + if (neededScanCount <= completedScanCount) { return; } // If no scan is already ongoing, mark the CRM as dirty and kick - if (!isScanning) { + if (curScanCount < 0) { doRescan.signal(); } // Wait until the scan finishes and the count advances - final long startCount = scanCount; - while ((!shutdown) && (startCount >= scanCount)) { + while ((!shutdown) && (completedScanCount < neededScanCount)) { try { scanFinished.await(); } catch (InterruptedException e) { @@ -253,7 +250,14 @@ public class CacheReplicationMonitor ext public void setNeedsRescan() { Preconditions.checkArgument(lock.isHeldByCurrentThread(), "Must hold the CRM lock when setting the needsRescan bit."); - this.needsRescan = true; + if (curScanCount >= 0) { + // If there is a scan in progress, we need to wait for the scan after + // that. + neededScanCount = curScanCount + 1; + } else { + // If there is no scan in progress, we need to wait for the next scan. + neededScanCount = completedScanCount + 1; + } } /** @@ -284,10 +288,17 @@ public class CacheReplicationMonitor ext scannedBlocks = 0; namesystem.writeLock(); try { + lock.lock(); if (shutdown) { throw new InterruptedException("CacheReplicationMonitor was " + "shut down."); } + curScanCount = completedScanCount + 1; + } + finally { + lock.unlock(); + } + try { resetStatistics(); rescanCacheDirectives(); rescanCachedBlockMap();