HDFS-13947. Review of DirectoryScanner Class. Contributed by BELUGA BEHR.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1dc0adfa Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1dc0adfa Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1dc0adfa Branch: refs/heads/HDFS-12943 Commit: 1dc0adfac0ee4821c67366728c70be9b59477b0f Parents: 7051bd7 Author: Inigo Goiri <inigo...@apache.org> Authored: Wed Oct 3 11:19:57 2018 -0700 Committer: Inigo Goiri <inigo...@apache.org> Committed: Wed Oct 3 11:19:57 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 +- .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../hdfs/server/datanode/DirectoryScanner.java | 601 ++++++++++--------- .../server/datanode/TestDirectoryScanner.java | 243 ++++---- .../fsdataset/impl/TestProvidedImpl.java | 13 +- .../namenode/TestListCorruptFileBlocks.java | 50 +- 6 files changed, 481 insertions(+), 430 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index d8024dc..42709de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -709,7 +709,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY = "dfs.datanode.directoryscan.throttle.limit.ms.per.sec"; public static final int - DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT = 1000; + DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT = -1; public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface"; public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default"; public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 270e30b..40f80a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1086,7 +1086,7 @@ public class DataNode extends ReconfigurableBase reason = "verifcation is not supported by SimulatedFSDataset"; } if (reason == null) { - directoryScanner = new DirectoryScanner(this, data, conf); + directoryScanner = new DirectoryScanner(data, conf); directoryScanner.start(); } else { LOG.info("Periodic Directory Tree Verification scan " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 99584d9..484899d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -17,17 +17,19 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.io.FilenameFilter; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -36,23 +38,27 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.lang3.time.FastDateFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.StopWatch; -import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; /** * Periodically scans the data directories for block and block metadata files. @@ -62,48 +68,48 @@ import org.apache.hadoop.util.Time; public class DirectoryScanner implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(DirectoryScanner.class); - private static final int MILLIS_PER_SECOND = 1000; - private static final String START_MESSAGE = - "Periodic Directory Tree Verification scan" - + " starting at %s with interval of %dms"; - private static final String START_MESSAGE_WITH_THROTTLE = START_MESSAGE - + " and throttle limit of %dms/s"; + + private static final int DEFAULT_MAP_SIZE = 32768; private final FsDatasetSpi<?> dataset; private final ExecutorService reportCompileThreadPool; private final ScheduledExecutorService masterThread; private final long scanPeriodMsecs; - private final int throttleLimitMsPerSec; - private volatile boolean shouldRun = false; + private final long throttleLimitMsPerSec; + private final AtomicBoolean shouldRun = new AtomicBoolean(); + private boolean retainDiffs = false; - private final DataNode datanode; /** * Total combined wall clock time (in milliseconds) spent by the report - * compiler threads executing. Used for testing purposes. + * compiler threads executing. Used for testing purposes. */ @VisibleForTesting final AtomicLong timeRunningMs = new AtomicLong(0L); + /** * Total combined wall clock time (in milliseconds) spent by the report - * compiler threads blocked by the throttle. Used for testing purposes. + * compiler threads blocked by the throttle. Used for testing purposes. */ @VisibleForTesting final AtomicLong timeWaitingMs = new AtomicLong(0L); + /** * The complete list of block differences indexed by block pool ID. */ @VisibleForTesting - final ScanInfoPerBlockPool diffs = new ScanInfoPerBlockPool(); + final BlockPoolReport diffs = new BlockPoolReport(); + /** - * Statistics about the block differences in each blockpool, indexed by - * block pool ID. + * Statistics about the block differences in each blockpool, indexed by block + * pool ID. */ @VisibleForTesting - final Map<String, Stats> stats = new HashMap<String, Stats>(); - + final Map<String, Stats> stats; + /** - * Allow retaining diffs for unit test and analysis. Defaults to false (off) + * Allow retaining diffs for unit test and analysis. Defaults to false (off). + * * @param b whether to retain diffs */ @VisibleForTesting @@ -123,92 +129,157 @@ public class DirectoryScanner implements Runnable { long missingMemoryBlocks = 0; long mismatchBlocks = 0; long duplicateBlocks = 0; - + /** * Create a new Stats object for the given blockpool ID. + * * @param bpid blockpool ID */ public Stats(String bpid) { this.bpid = bpid; } - + @Override public String toString() { - return "BlockPool " + bpid - + " Total blocks: " + totalBlocks + ", missing metadata files:" - + missingMetaFile + ", missing block files:" + missingBlockFile - + ", missing blocks in memory:" + missingMemoryBlocks - + ", mismatched blocks:" + mismatchBlocks; + return "BlockPool " + bpid + " Total blocks: " + totalBlocks + + ", missing metadata files: " + missingMetaFile + + ", missing block files: " + missingBlockFile + + ", missing blocks in memory: " + missingMemoryBlocks + + ", mismatched blocks: " + mismatchBlocks; } } /** * Helper class for compiling block info reports from report compiler threads. + * Contains a volume, a set of block pool IDs, and a collection of ScanInfo + * objects. If a block pool exists but has no ScanInfo objects associated with + * it, there will be no mapping for that particular block pool. */ - static class ScanInfoPerBlockPool extends - HashMap<String, LinkedList<ScanInfo>> { - + @VisibleForTesting + public static class ScanInfoVolumeReport { + + @SuppressWarnings("unused") private static final long serialVersionUID = 1L; + private final FsVolumeSpi volume; + + private final BlockPoolReport blockPoolReport; + /** * Create a new info list. + * + * @param volume */ - ScanInfoPerBlockPool() {super();} + ScanInfoVolumeReport(final FsVolumeSpi volume) { + this.volume = volume; + this.blockPoolReport = new BlockPoolReport(); + } /** * Create a new info list initialized to the given expected size. - * See {@link java.util.HashMap#HashMap(int)}. * * @param sz initial expected size */ - ScanInfoPerBlockPool(int sz) {super(sz);} - + ScanInfoVolumeReport(final FsVolumeSpi volume, + final Collection<String> blockPools) { + this.volume = volume; + this.blockPoolReport = new BlockPoolReport(blockPools); + } + + public void addAll(final String bpid, + final Collection<ScanInfo> scanInfos) { + this.blockPoolReport.addAll(bpid, scanInfos); + } + + public Set<String> getBlockPoolIds() { + return this.blockPoolReport.getBlockPoolIds(); + } + + public List<ScanInfo> getScanInfo(final String bpid) { + return this.blockPoolReport.getScanInfo(bpid); + } + + public FsVolumeSpi getVolume() { + return volume; + } + + @Override + public String toString() { + return "ScanInfoVolumeReport [volume=" + volume + ", blockPoolReport=" + + blockPoolReport + "]"; + } + } + + /** + * Helper class for compiling block info reports per block pool. + */ + @VisibleForTesting + public static class BlockPoolReport { + + @SuppressWarnings("unused") + private static final long serialVersionUID = 1L; + + private final Set<String> blockPools; + + private final ListMultimap<String, ScanInfo> map; + /** - * Merges {@code that} ScanInfoPerBlockPool into this one + * Create a block pool report. * - * @param that ScanInfoPerBlockPool to merge + * @param volume */ - public void addAll(ScanInfoPerBlockPool that) { - if (that == null) return; - - for (Entry<String, LinkedList<ScanInfo>> entry : that.entrySet()) { - String bpid = entry.getKey(); - LinkedList<ScanInfo> list = entry.getValue(); - - if (this.containsKey(bpid)) { - //merge that per-bpid linked list with this one - this.get(bpid).addAll(list); - } else { - //add that new bpid and its linked list to this - this.put(bpid, list); - } - } + BlockPoolReport() { + this.blockPools = new HashSet<>(2); + this.map = ArrayListMultimap.create(2, DEFAULT_MAP_SIZE); } - + /** - * Convert all the LinkedList values in this ScanInfoPerBlockPool map - * into sorted arrays, and return a new map of these arrays per blockpool + * Create a new block pool report initialized to the given expected size. * - * @return a map of ScanInfo arrays per blockpool + * @param blockPools initial list of known block pools */ - public Map<String, ScanInfo[]> toSortedArrays() { - Map<String, ScanInfo[]> result = - new HashMap<String, ScanInfo[]>(this.size()); - - for (Entry<String, LinkedList<ScanInfo>> entry : this.entrySet()) { - String bpid = entry.getKey(); - LinkedList<ScanInfo> list = entry.getValue(); - - // convert list to array - ScanInfo[] record = list.toArray(new ScanInfo[list.size()]); + BlockPoolReport(final Collection<String> blockPools) { + this.blockPools = new HashSet<>(blockPools); + this.map = ArrayListMultimap.create(blockPools.size(), DEFAULT_MAP_SIZE); + + } + + public void addAll(final String bpid, + final Collection<ScanInfo> scanInfos) { + this.blockPools.add(bpid); + this.map.putAll(bpid, scanInfos); + } + + public void sortBlocks() { + for (final String bpid : this.map.keySet()) { + final List<ScanInfo> list = this.map.get(bpid); // Sort array based on blockId - Arrays.sort(record); - result.put(bpid, record); + Collections.sort(list); } - return result; } - } + public Set<String> getBlockPoolIds() { + return Collections.unmodifiableSet(this.blockPools); + } + + public List<ScanInfo> getScanInfo(final String bpid) { + return this.map.get(bpid); + } + + public Collection<Map.Entry<String, ScanInfo>> getEntries() { + return Collections.unmodifiableCollection(this.map.entries()); + } + + public void clear() { + this.map.clear(); + this.blockPools.clear(); + } + + @Override + public String toString() { + return "BlockPoolReport [blockPools=" + blockPools + ", map=" + map + "]"; + } + } /** * Create a new directory scanner, but don't cycle it running yet. @@ -217,75 +288,58 @@ public class DirectoryScanner implements Runnable { * @param dataset the dataset to scan * @param conf the Configuration object */ - public DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, - Configuration conf) { - this.datanode = datanode; + public DirectoryScanner(FsDatasetSpi<?> dataset, Configuration conf) { this.dataset = dataset; + this.stats = new HashMap<>(DEFAULT_MAP_SIZE); int interval = (int) conf.getTimeDuration( DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT, TimeUnit.SECONDS); - scanPeriodMsecs = interval * MILLIS_PER_SECOND; //msec - int throttle = - conf.getInt( - DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, - DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT); + scanPeriodMsecs = TimeUnit.SECONDS.toMillis(interval); - if ((throttle > MILLIS_PER_SECOND) || (throttle <= 0)) { - if (throttle > MILLIS_PER_SECOND) { - LOG.error( - DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY - + " set to value above 1000 ms/sec. Assuming default value of " + - DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT); - } else { - LOG.error( - DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY - + " set to value below 1 ms/sec. Assuming default value of " + - DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT); - } + int throttle = conf.getInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT); - throttleLimitMsPerSec = + if (throttle >= TimeUnit.SECONDS.toMillis(1)) { + LOG.warn( + "{} set to value above 1000 ms/sec. Assuming default value of {}", + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT); + throttle = DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT; - } else { - throttleLimitMsPerSec = throttle; } - int threads = + throttleLimitMsPerSec = throttle; + + int threads = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, - DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT); + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT); - reportCompileThreadPool = Executors.newFixedThreadPool(threads, - new Daemon.DaemonFactory()); - masterThread = new ScheduledThreadPoolExecutor(1, - new Daemon.DaemonFactory()); + reportCompileThreadPool = + Executors.newFixedThreadPool(threads, new Daemon.DaemonFactory()); + + masterThread = + new ScheduledThreadPoolExecutor(1, new Daemon.DaemonFactory()); } /** - * Start the scanner. The scanner will run every + * Start the scanner. The scanner will run every * {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds. */ void start() { - shouldRun = true; - long offset = ThreadLocalRandom.current().nextInt( - (int) (scanPeriodMsecs/MILLIS_PER_SECOND)) * MILLIS_PER_SECOND; //msec - long firstScanTime = Time.now() + offset; - String logMsg; - - if (throttleLimitMsPerSec < MILLIS_PER_SECOND) { - logMsg = String.format(START_MESSAGE_WITH_THROTTLE, - FastDateFormat.getInstance().format(firstScanTime), scanPeriodMsecs, - throttleLimitMsPerSec); - } else { - logMsg = String.format(START_MESSAGE, - FastDateFormat.getInstance().format(firstScanTime), scanPeriodMsecs); - } - - LOG.info(logMsg); - masterThread.scheduleAtFixedRate(this, offset, scanPeriodMsecs, - TimeUnit.MILLISECONDS); + shouldRun.set(true); + long firstScanTime = ThreadLocalRandom.current().nextLong(scanPeriodMsecs); + + LOG.info( + "Periodic Directory Tree Verification scan starting in {}ms with interval of {}ms and throttle limit of {}ms/s", + firstScanTime, scanPeriodMsecs, throttleLimitMsPerSec); + + masterThread.scheduleAtFixedRate(this, firstScanTime, scanPeriodMsecs, + TimeUnit.MILLISECONDS); } - + /** * Return whether the scanner has been started. * @@ -293,7 +347,7 @@ public class DirectoryScanner implements Runnable { */ @VisibleForTesting boolean getRunStatus() { - return shouldRun; + return shouldRun.get(); } /** @@ -305,67 +359,69 @@ public class DirectoryScanner implements Runnable { } /** - * Main program loop for DirectoryScanner. Runs {@link reconcile()} - * and handles any exceptions. + * Main program loop for DirectoryScanner. Runs {@link reconcile()} and + * handles any exceptions. */ @Override public void run() { + if (!shouldRun.get()) { + // shutdown has been activated + LOG.warn( + "This cycle terminating immediately because 'shouldRun' has been deactivated"); + return; + } try { - if (!shouldRun) { - //shutdown has been activated - LOG.warn("this cycle terminating immediately because 'shouldRun' has been deactivated"); - return; - } - - //We're are okay to run - do it - reconcile(); - + reconcile(); } catch (Exception e) { - //Log and continue - allows Executor to run again next cycle - LOG.error("Exception during DirectoryScanner execution - will continue next cycle", e); + // Log and continue - allows Executor to run again next cycle + LOG.error( + "Exception during DirectoryScanner execution - will continue next cycle", + e); } catch (Error er) { - //Non-recoverable error - re-throw after logging the problem - LOG.error("System Error during DirectoryScanner execution - permanently terminating periodic scanner", er); + // Non-recoverable error - re-throw after logging the problem + LOG.error( + "System Error during DirectoryScanner execution - permanently terminating periodic scanner", + er); throw er; } } - + /** - * Stops the directory scanner. This method will wait for 1 minute for the + * Stops the directory scanner. This method will wait for 1 minute for the * main thread to exit and an additional 1 minute for the report compilation - * threads to exit. If a thread does not exit in that time period, it is - * left running, and an error is logged. + * threads to exit. If a thread does not exit in that time period, it is left + * running, and an error is logged. */ void shutdown() { - if (!shouldRun) { - LOG.warn("DirectoryScanner: shutdown has been called, but periodic scanner not started"); - } else { - LOG.warn("DirectoryScanner: shutdown has been called"); + LOG.info("Shutdown has been called"); + if (!shouldRun.getAndSet(false)) { + LOG.warn("Shutdown has been called, but periodic scanner not started"); + } + if (masterThread != null) { + masterThread.shutdown(); } - shouldRun = false; - if (masterThread != null) masterThread.shutdown(); - if (reportCompileThreadPool != null) { reportCompileThreadPool.shutdownNow(); } - if (masterThread != null) { try { masterThread.awaitTermination(1, TimeUnit.MINUTES); } catch (InterruptedException e) { - LOG.error("interrupted while waiting for masterThread to " + - "terminate", e); + LOG.error( + "interrupted while waiting for masterThread to " + "terminate", e); } } if (reportCompileThreadPool != null) { try { reportCompileThreadPool.awaitTermination(1, TimeUnit.MINUTES); } catch (InterruptedException e) { - LOG.error("interrupted while waiting for reportCompileThreadPool to " + - "terminate", e); + LOG.error("interrupted while waiting for reportCompileThreadPool to " + + "terminate", e); } } - if (!retainDiffs) clear(); + if (!retainDiffs) { + clear(); + } } /** @@ -374,45 +430,54 @@ public class DirectoryScanner implements Runnable { @VisibleForTesting public void reconcile() throws IOException { scan(); - for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) { - String bpid = entry.getKey(); - LinkedList<ScanInfo> diff = entry.getValue(); - - for (ScanInfo info : diff) { - dataset.checkAndUpdate(bpid, info); - } + + for (final Map.Entry<String, ScanInfo> entry : diffs.getEntries()) { + dataset.checkAndUpdate(entry.getKey(), entry.getValue()); + } + + if (!retainDiffs) { + clear(); } - if (!retainDiffs) clear(); } /** - * Scan for the differences between disk and in-memory blocks - * Scan only the "finalized blocks" lists of both disk and memory. + * Scan for the differences between disk and in-memory blocks Scan only the + * "finalized blocks" lists of both disk and memory. */ private void scan() { + BlockPoolReport blockPoolReport = new BlockPoolReport(); + clear(); - Map<String, ScanInfo[]> diskReport = getDiskReport(); + + Collection<ScanInfoVolumeReport> volumeReports = getVolumeReports(); + for (ScanInfoVolumeReport volumeReport : volumeReports) { + for (String blockPoolId : volumeReport.getBlockPoolIds()) { + List<ScanInfo> scanInfos = volumeReport.getScanInfo(blockPoolId); + blockPoolReport.addAll(blockPoolId, scanInfos); + } + } + + // Pre-sort the reports outside of the lock + blockPoolReport.sortBlocks(); // Hold FSDataset lock to prevent further changes to the block map - try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { - for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) { - String bpid = entry.getKey(); - ScanInfo[] blockpoolReport = entry.getValue(); - + try (AutoCloseableLock lock = dataset.acquireDatasetLock()) { + for (final String bpid : blockPoolReport.getBlockPoolIds()) { + List<ScanInfo> blockpoolReport = blockPoolReport.getScanInfo(bpid); + Stats statsRecord = new Stats(bpid); stats.put(bpid, statsRecord); - LinkedList<ScanInfo> diffRecord = new LinkedList<ScanInfo>(); - diffs.put(bpid, diffRecord); - - statsRecord.totalBlocks = blockpoolReport.length; + Collection<ScanInfo> diffRecord = new ArrayList<>(); + + statsRecord.totalBlocks = blockpoolReport.size(); final List<ReplicaInfo> bl = dataset.getFinalizedBlocks(bpid); Collections.sort(bl); // Sort based on blockId - + int d = 0; // index for blockpoolReport int m = 0; // index for memReprot - while (m < bl.size() && d < blockpoolReport.length) { + while (m < bl.size() && d < blockpoolReport.size()) { ReplicaInfo memBlock = bl.get(m); - ScanInfo info = blockpoolReport[d]; + ScanInfo info = blockpoolReport.get(d); if (info.getBlockId() < memBlock.getBlockId()) { if (!dataset.isDeletingBlock(bpid, info.getBlockId())) { // Block is missing in memory @@ -424,15 +489,15 @@ public class DirectoryScanner implements Runnable { } if (info.getBlockId() > memBlock.getBlockId()) { // Block is missing on the disk - addDifference(diffRecord, statsRecord, - memBlock.getBlockId(), info.getVolume()); + addDifference(diffRecord, statsRecord, memBlock.getBlockId(), + info.getVolume()); m++; continue; } // Block file and/or metadata file exists on the disk // Block exists in memory - if (info.getVolume().getStorageType() != StorageType.PROVIDED && - info.getBlockFile() == null) { + if (info.getVolume().getStorageType() != StorageType.PROVIDED + && info.getBlockFile() == null) { // Block metadata file exits and block file is missing addDifference(diffRecord, statsRecord, info); } else if (info.getGenStamp() != memBlock.getGenerationStamp() @@ -442,16 +507,16 @@ public class DirectoryScanner implements Runnable { statsRecord.mismatchBlocks++; addDifference(diffRecord, statsRecord, info); } else if (memBlock.compareWith(info) != 0) { - // volumeMap record and on-disk files don't match. + // volumeMap record and on-disk files do not match. statsRecord.duplicateBlocks++; addDifference(diffRecord, statsRecord, info); } d++; - if (d < blockpoolReport.length) { - // There may be multiple on-disk records for the same block, don't increment - // the memory record pointer if so. - ScanInfo nextInfo = blockpoolReport[d]; + if (d < blockpoolReport.size()) { + // There may be multiple on-disk records for the same block, do not + // increment the memory record pointer if so. + ScanInfo nextInfo = blockpoolReport.get(d); if (nextInfo.getBlockId() != info.getBlockId()) { ++m; } @@ -461,132 +526,108 @@ public class DirectoryScanner implements Runnable { } while (m < bl.size()) { ReplicaInfo current = bl.get(m++); - addDifference(diffRecord, statsRecord, - current.getBlockId(), current.getVolume()); + addDifference(diffRecord, statsRecord, current.getBlockId(), + current.getVolume()); } - while (d < blockpoolReport.length) { - if (!dataset.isDeletingBlock(bpid, blockpoolReport[d].getBlockId())) { + while (d < blockpoolReport.size()) { + if (!dataset.isDeletingBlock(bpid, + blockpoolReport.get(d).getBlockId())) { statsRecord.missingMemoryBlocks++; - addDifference(diffRecord, statsRecord, blockpoolReport[d]); + addDifference(diffRecord, statsRecord, blockpoolReport.get(d)); } d++; } - LOG.info(statsRecord.toString()); - } //end for - } //end synchronized + diffs.addAll(bpid, diffRecord); + LOG.info("Scan Results: {}", statsRecord); + } + } } /** * Add the ScanInfo object to the list of differences and adjust the stats - * accordingly. This method is called when a block is found on the disk, - * but the in-memory block is missing or does not match the block on the disk. + * accordingly. This method is called when a block is found on the disk, but + * the in-memory block is missing or does not match the block on the disk. * - * @param diffRecord the list to which to add the info + * @param diffRecord the collection to which to add the info * @param statsRecord the stats to update * @param info the differing info */ - private void addDifference(LinkedList<ScanInfo> diffRecord, - Stats statsRecord, ScanInfo info) { + private void addDifference(Collection<ScanInfo> diffRecord, Stats statsRecord, + ScanInfo info) { statsRecord.missingMetaFile += info.getMetaFile() == null ? 1 : 0; statsRecord.missingBlockFile += info.getBlockFile() == null ? 1 : 0; diffRecord.add(info); } /** - * Add a new ScanInfo object to the list of differences and adjust the stats - * accordingly. This method is called when a block is not found on the disk. + * Add a new ScanInfo object to the collection of differences and adjust the + * stats accordingly. This method is called when a block is not found on the + * disk. * - * @param diffRecord the list to which to add the info + * @param diffRecord the collection to which to add the info * @param statsRecord the stats to update * @param blockId the id of the missing block * @param vol the volume that contains the missing block */ - private void addDifference(LinkedList<ScanInfo> diffRecord, - Stats statsRecord, long blockId, - FsVolumeSpi vol) { + private void addDifference(Collection<ScanInfo> diffRecord, Stats statsRecord, + long blockId, FsVolumeSpi vol) { statsRecord.missingBlockFile++; statsRecord.missingMetaFile++; diffRecord.add(new ScanInfo(blockId, null, null, vol)); } /** - * Get the lists of blocks on the disks in the dataset, sorted by blockId. - * The returned map contains one entry per blockpool, keyed by the blockpool - * ID. - * - * @return a map of sorted arrays of block information + * Get the lists of blocks on the disks in the data set. */ @VisibleForTesting - public Map<String, ScanInfo[]> getDiskReport() { - ScanInfoPerBlockPool list = new ScanInfoPerBlockPool(); - ScanInfoPerBlockPool[] dirReports = null; + public Collection<ScanInfoVolumeReport> getVolumeReports() { + List<ScanInfoVolumeReport> volReports = new ArrayList<>(); + List<Future<ScanInfoVolumeReport>> compilersInProgress = new ArrayList<>(); + // First get list of data directories try (FsDatasetSpi.FsVolumeReferences volumes = dataset.getFsVolumeReferences()) { - // Use an array since the threads may return out of order and - // compilersInProgress#keySet may return out of order as well. - dirReports = new ScanInfoPerBlockPool[volumes.size()]; - - Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress = - new HashMap<Integer, Future<ScanInfoPerBlockPool>>(); - - for (int i = 0; i < volumes.size(); i++) { - if (volumes.get(i).getStorageType() == StorageType.PROVIDED) { - // Disable scanning PROVIDED volumes to keep overhead low - continue; + for (final FsVolumeSpi volume : volumes) { + // Disable scanning PROVIDED volumes to keep overhead low + if (volume.getStorageType() != StorageType.PROVIDED) { + ReportCompiler reportCompiler = new ReportCompiler(volume); + Future<ScanInfoVolumeReport> result = + reportCompileThreadPool.submit(reportCompiler); + compilersInProgress.add(result); } - ReportCompiler reportCompiler = - new ReportCompiler(datanode, volumes.get(i)); - Future<ScanInfoPerBlockPool> result = - reportCompileThreadPool.submit(reportCompiler); - compilersInProgress.put(i, result); } - for (Entry<Integer, Future<ScanInfoPerBlockPool>> report : - compilersInProgress.entrySet()) { - Integer index = report.getKey(); + for (Future<ScanInfoVolumeReport> future : compilersInProgress) { try { - dirReports[index] = report.getValue().get(); - - // If our compiler threads were interrupted, give up on this run - if (dirReports[index] == null) { - dirReports = null; + final ScanInfoVolumeReport result = future.get(); + if (!CollectionUtils.addIgnoreNull(volReports, result)) { + // This compiler thread were interrupted, give up on this run + volReports.clear(); break; } } catch (Exception ex) { - FsVolumeSpi fsVolumeSpi = volumes.get(index); - LOG.error("Error compiling report for the volume, StorageId: " - + fsVolumeSpi.getStorageID(), ex); - // Continue scanning the other volumes + LOG.warn("Error compiling report. Continuing.", ex); } } } catch (IOException e) { LOG.error("Unexpected IOException by closing FsVolumeReference", e); } - if (dirReports != null) { - // Compile consolidated report for all the volumes - for (ScanInfoPerBlockPool report : dirReports) { - if(report != null){ - list.addAll(report); - } - } - } - return list.toSortedArrays(); + + return volReports; } /** * The ReportCompiler class encapsulates the process of searching a datanode's - * disks for block information. It operates by performing a DFS of the - * volume to discover block information. + * disks for block information. It operates by performing a DFS of the volume + * to discover block information. * * When the ReportCompiler discovers block information, it create a new - * ScanInfo object for it and adds that object to its report list. The report + * ScanInfo object for it and adds that object to its report list. The report * list is returned by the {@link #call()} method. */ - public class ReportCompiler implements Callable<ScanInfoPerBlockPool> { + public class ReportCompiler implements Callable<ScanInfoVolumeReport> { private final FsVolumeSpi volume; - private final DataNode datanode; // Variable for tracking time spent running for throttling purposes private final StopWatch throttleTimer = new StopWatch(); // Variable for tracking time spent running and waiting for testing @@ -594,13 +635,11 @@ public class DirectoryScanner implements Runnable { private final StopWatch perfTimer = new StopWatch(); /** - * Create a report compiler for the given volume on the given datanode. + * Create a report compiler for the given volume. * - * @param datanode the target datanode * @param volume the target volume */ - public ReportCompiler(DataNode datanode, FsVolumeSpi volume) { - this.datanode = datanode; + public ReportCompiler(FsVolumeSpi volume) { this.volume = volume; } @@ -608,12 +647,13 @@ public class DirectoryScanner implements Runnable { * Run this report compiler thread. * * @return the block info report list - * @throws IOException if the block pool isn't found + * @throws IOException if the block pool is not found */ @Override - public ScanInfoPerBlockPool call() throws IOException { + public ScanInfoVolumeReport call() throws IOException { String[] bpList = volume.getBlockPoolList(); - ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length); + ScanInfoVolumeReport result = + new ScanInfoVolumeReport(volume, Arrays.asList(bpList)); perfTimer.start(); throttleTimer.start(); for (String bpid : bpList) { @@ -623,33 +663,45 @@ public class DirectoryScanner implements Runnable { throttleTimer.reset().start(); try { - result.put(bpid, volume.compileReport(bpid, report, this)); + // ScanInfos are added directly to 'report' list + volume.compileReport(bpid, report, this); + result.addAll(bpid, report); } catch (InterruptedException ex) { // Exit quickly and flag the scanner to do the same result = null; break; } } + LOG.trace("Scanner volume report: {}", result); return result; } /** - * Called by the thread before each potential disk scan so that a pause - * can be optionally inserted to limit the number of scans per second. - * The limit is controlled by + * Called by the thread before each potential disk scan so that a pause can + * be optionally inserted to limit the number of scans per second. The limit + * is controlled by * {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}. */ public void throttle() throws InterruptedException { accumulateTimeRunning(); - if ((throttleLimitMsPerSec < 1000) && - (throttleTimer.now(TimeUnit.MILLISECONDS) > throttleLimitMsPerSec)) { - - Thread.sleep(MILLIS_PER_SECOND - throttleLimitMsPerSec); - throttleTimer.reset().start(); + if (throttleLimitMsPerSec > 0L) { + final long runningTime = throttleTimer.now(TimeUnit.MILLISECONDS); + if (runningTime >= throttleLimitMsPerSec) { + final long sleepTime; + if (runningTime >= 1000L) { + LOG.warn("Unable to throttle within the second. Blocking for 1s."); + sleepTime = 1000L; + } else { + // Sleep for the expected time plus any time processing ran over + final long overTime = runningTime - throttleLimitMsPerSec; + sleepTime = (1000L - throttleLimitMsPerSec) + overTime; + } + Thread.sleep(sleepTime); + throttleTimer.reset().start(); + } + accumulateTimeWaiting(); } - - accumulateTimeWaiting(); } /** @@ -679,4 +731,5 @@ public class DirectoryScanner implements Runnable { || name.startsWith(Block.BLOCK_FILE_PREFIX); } } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 312bc86..e29a147 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -33,6 +33,7 @@ import java.net.URI; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -44,26 +45,23 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; -import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics; -import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler; +import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; @@ -73,14 +71,17 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.Time; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Tests {@link DirectoryScanner} handling of differences - * between blocks on the disk and block in memory. + * Tests {@link DirectoryScanner} handling of differences between blocks on the + * disk and block in memory. */ public class TestDirectoryScanner { private static final Logger LOG = @@ -102,7 +103,7 @@ public class TestDirectoryScanner { CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1); CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); CONF.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, - getMemlockLimit(Long.MAX_VALUE)); + getMemlockLimit(Long.MAX_VALUE)); } @Before @@ -110,21 +111,20 @@ public class TestDirectoryScanner { LazyPersistTestCase.initCacheManipulator(); } - /** create a file with a length of <code>fileLen</code> */ - private List<LocatedBlock> createFile(String fileNamePrefix, - long fileLen, - boolean isLazyPersist) throws IOException { + /** create a file with a length of <code>fileLen</code>. */ + private List<LocatedBlock> createFile(String fileNamePrefix, long fileLen, + boolean isLazyPersist) throws IOException { FileSystem fs = cluster.getFileSystem(); Path filePath = new Path("/" + fileNamePrefix + ".dat"); - DFSTestUtil.createFile( - fs, filePath, isLazyPersist, 1024, fileLen, + DFSTestUtil.createFile(fs, filePath, isLazyPersist, 1024, fileLen, BLOCK_LENGTH, (short) 1, r.nextLong(), false); - return client.getLocatedBlocks(filePath.toString(), 0, fileLen).getLocatedBlocks(); + return client.getLocatedBlocks(filePath.toString(), 0, fileLen) + .getLocatedBlocks(); } - /** Truncate a block file */ + /** Truncate a block file. */ private long truncateBlockFile() throws IOException { - try(AutoCloseableLock lock = fds.acquireDatasetLock()) { + try (AutoCloseableLock lock = fds.acquireDatasetLock()) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { File f = new File(b.getBlockURI()); File mf = new File(b.getMetadataURI()); @@ -149,7 +149,7 @@ public class TestDirectoryScanner { /** Delete a block file */ private long deleteBlockFile() { - try(AutoCloseableLock lock = fds.acquireDatasetLock()) { + try (AutoCloseableLock lock = fds.acquireDatasetLock()) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { File f = new File(b.getBlockURI()); File mf = new File(b.getMetadataURI()); @@ -165,7 +165,7 @@ public class TestDirectoryScanner { /** Delete block meta file */ private long deleteMetaFile() { - try(AutoCloseableLock lock = fds.acquireDatasetLock()) { + try (AutoCloseableLock lock = fds.acquireDatasetLock()) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { // Delete a metadata file if (b.metadataExists() && b.deleteMetadata()) { @@ -179,11 +179,12 @@ public class TestDirectoryScanner { /** * Duplicate the given block on all volumes. + * * @param blockId * @throws IOException */ private void duplicateBlock(long blockId) throws IOException { - try(AutoCloseableLock lock = fds.acquireDatasetLock()) { + try (AutoCloseableLock lock = fds.acquireDatasetLock()) { ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId); try (FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) { @@ -199,16 +200,14 @@ public class TestDirectoryScanner { URI destRoot = v.getStorageLocation().getUri(); String relativeBlockPath = - sourceRoot.relativize(sourceBlock.toURI()) - .getPath(); + sourceRoot.relativize(sourceBlock.toURI()).getPath(); String relativeMetaPath = - sourceRoot.relativize(sourceMeta.toURI()) - .getPath(); + sourceRoot.relativize(sourceMeta.toURI()).getPath(); - File destBlock = new File(new File(destRoot).toString(), - relativeBlockPath); - File destMeta = new File(new File(destRoot).toString(), - relativeMetaPath); + File destBlock = + new File(new File(destRoot).toString(), relativeBlockPath); + File destMeta = + new File(new File(destRoot).toString(), relativeMetaPath); destBlock.getParentFile().mkdirs(); FileUtils.copyFile(sourceBlock, destBlock); @@ -223,7 +222,7 @@ public class TestDirectoryScanner { } } - /** Get a random blockId that is not used already */ + /** Get a random blockId that is not used already. */ private long getFreeBlockId() { long id = rand.nextLong(); while (true) { @@ -244,14 +243,15 @@ public class TestDirectoryScanner { + Block.METADATA_EXTENSION; } - /** Create a block file in a random volume*/ + /** Create a block file in a random volume. */ private long createBlockFile() throws IOException { long id = getFreeBlockId(); - try (FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) { + try ( + FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) { int numVolumes = volumes.size(); int index = rand.nextInt(numVolumes - 1); - File finalizedDir = ((FsVolumeImpl) volumes.get(index)) - .getFinalizedDir(bpid); + File finalizedDir = + ((FsVolumeImpl) volumes.get(index)).getFinalizedDir(bpid); File file = new File(finalizedDir, getBlockFile(id)); if (file.createNewFile()) { LOG.info("Created block file " + file.getName()); @@ -260,14 +260,14 @@ public class TestDirectoryScanner { return id; } - /** Create a metafile in a random volume*/ + /** Create a metafile in a random volume */ private long createMetaFile() throws IOException { long id = getFreeBlockId(); try (FsDatasetSpi.FsVolumeReferences refs = fds.getFsVolumeReferences()) { int numVolumes = refs.size(); int index = rand.nextInt(numVolumes - 1); - File finalizedDir = ((FsVolumeImpl) refs.get(index)) - .getFinalizedDir(bpid); + File finalizedDir = + ((FsVolumeImpl) refs.get(index)).getFinalizedDir(bpid); File file = new File(finalizedDir, getMetaFile(id)); if (file.createNewFile()) { LOG.info("Created metafile " + file.getName()); @@ -276,7 +276,7 @@ public class TestDirectoryScanner { return id; } - /** Create block file and corresponding metafile in a rondom volume */ + /** Create block file and corresponding metafile in a rondom volume. */ private long createBlockMetaFile() throws IOException { long id = getFreeBlockId(); @@ -318,7 +318,7 @@ public class TestDirectoryScanner { long missingBlockFile, long missingMemoryBlocks, long mismatchBlocks) throws IOException, InterruptedException, TimeoutException { scan(totalBlocks, diffsize, missingMetaFile, missingBlockFile, - missingMemoryBlocks, mismatchBlocks, 0); + missingMemoryBlocks, mismatchBlocks, 0); } private void scan(long totalBlocks, int diffsize, long missingMetaFile, @@ -332,22 +332,22 @@ public class TestDirectoryScanner { verifyStats(totalBlocks, diffsize, missingMetaFile, missingBlockFile, missingMemoryBlocks, mismatchBlocks, duplicateBlocks); } catch (AssertionError ex) { + LOG.warn("Assertion Error", ex); return false; } return true; - }, 50, 2000); + }, 100, 2000); } private void verifyStats(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile, long missingMemoryBlocks, long mismatchBlocks, long duplicateBlocks) { - assertTrue(scanner.diffs.containsKey(bpid)); - LinkedList<FsVolumeSpi.ScanInfo> diff = scanner.diffs.get(bpid); - assertTrue(scanner.stats.containsKey(bpid)); - DirectoryScanner.Stats stats = scanner.stats.get(bpid); - + Collection<FsVolumeSpi.ScanInfo> diff = scanner.diffs.getScanInfo(bpid); assertEquals(diffsize, diff.size()); + + DirectoryScanner.Stats stats = scanner.stats.get(bpid); + assertNotNull(stats); assertEquals(totalBlocks, stats.totalBlocks); assertEquals(missingMetaFile, stats.missingMetaFile); assertEquals(missingBlockFile, stats.missingBlockFile); @@ -356,20 +356,18 @@ public class TestDirectoryScanner { assertEquals(duplicateBlocks, stats.duplicateBlocks); } - @Test (timeout=300000) + @Test(timeout = 300000) public void testRetainBlockOnPersistentStorage() throws Exception { - cluster = new MiniDFSCluster - .Builder(CONF) - .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT }) - .numDataNodes(1) - .build(); + cluster = new MiniDFSCluster.Builder(CONF) + .storageTypes( + new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT }) + .numDataNodes(1).build(); try { cluster.waitActive(); - DataNode dataNode = cluster.getDataNodes().get(0); bpid = cluster.getNamesystem().getBlockPoolId(); fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); client = cluster.getFileSystem().getClient(); - scanner = new DirectoryScanner(dataNode, fds, CONF); + scanner = new DirectoryScanner(fds, CONF); scanner.setRetainDiffs(true); FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0)); @@ -397,24 +395,22 @@ public class TestDirectoryScanner { } } - @Test (timeout=300000) + @Test(timeout = 300000) public void testDeleteBlockOnTransientStorage() throws Exception { - cluster = new MiniDFSCluster - .Builder(CONF) - .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT }) - .numDataNodes(1) - .build(); + cluster = new MiniDFSCluster.Builder(CONF) + .storageTypes( + new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT }) + .numDataNodes(1).build(); try { cluster.waitActive(); bpid = cluster.getNamesystem().getBlockPoolId(); - DataNode dataNode = cluster.getDataNodes().get(0); fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); client = cluster.getFileSystem().getClient(); - scanner = new DirectoryScanner(dataNode, fds, CONF); + scanner = new DirectoryScanner(fds, CONF); scanner.setRetainDiffs(true); FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0)); - // Create a file file on RAM_DISK + // Create a file on RAM_DISK List<LocatedBlock> blocks = createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, true); @@ -440,14 +436,14 @@ public class TestDirectoryScanner { } } - @Test (timeout=600000) + @Test(timeout = 600000) public void testDirectoryScanner() throws Exception { // Run the test with and without parallel scanning for (int parallelism = 1; parallelism < 3; parallelism++) { runTest(parallelism); } } - + public void runTest(int parallelism) throws Exception { cluster = new MiniDFSCluster.Builder(CONF).build(); try { @@ -456,9 +452,9 @@ public class TestDirectoryScanner { fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); client = cluster.getFileSystem().getClient(); CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, - parallelism); - DataNode dataNode = cluster.getDataNodes().get(0); - scanner = new DirectoryScanner(dataNode, fds, CONF); + parallelism); + + scanner = new DirectoryScanner(fds, CONF); scanner.setRetainDiffs(true); // Add files with 100 blocks @@ -492,7 +488,7 @@ public class TestDirectoryScanner { // Test5: A metafile exists for which there is no block file and // a block in memory blockId = createMetaFile(); - scan(totalBlocks+1, 1, 0, 1, 1, 0); + scan(totalBlocks + 1, 1, 0, 1, 1, 0); File metafile = new File(getMetaFile(blockId)); assertTrue(!metafile.exists()); scan(totalBlocks, 0, 0, 0, 0, 0); @@ -521,7 +517,7 @@ public class TestDirectoryScanner { scan(totalBlocks, 0, 0, 0, 0, 0); // Test9: create a bunch of blocks files - for (int i = 0; i < 10 ; i++) { + for (int i = 0; i < 10; i++) { blockId = createBlockFile(); } totalBlocks += 10; @@ -529,14 +525,14 @@ public class TestDirectoryScanner { scan(totalBlocks, 0, 0, 0, 0, 0); // Test10: create a bunch of metafiles - for (int i = 0; i < 10 ; i++) { + for (int i = 0; i < 10; i++) { blockId = createMetaFile(); } - scan(totalBlocks+10, 10, 0, 10, 10, 0); + scan(totalBlocks + 10, 10, 0, 10, 10, 0); scan(totalBlocks, 0, 0, 0, 0, 0); // Test11: create a bunch block files and meta files - for (int i = 0; i < 10 ; i++) { + for (int i = 0; i < 10; i++) { blockId = createBlockMetaFile(); } totalBlocks += 10; @@ -544,7 +540,7 @@ public class TestDirectoryScanner { scan(totalBlocks, 0, 0, 0, 0, 0); // Test12: truncate block files to test block length mismatch - for (int i = 0; i < 10 ; i++) { + for (int i = 0; i < 10; i++) { truncateBlockFile(); } scan(totalBlocks, 10, 0, 0, 0, 10); @@ -557,9 +553,9 @@ public class TestDirectoryScanner { deleteMetaFile(); deleteBlockFile(); truncateBlockFile(); - scan(totalBlocks+3, 6, 2, 2, 3, 2); - scan(totalBlocks+1, 0, 0, 0, 0, 0); - + scan(totalBlocks + 3, 6, 2, 2, 3, 2); + scan(totalBlocks + 1, 0, 0, 0, 0, 0); + // Test14: make sure no throttling is happening assertTrue("Throttle appears to be engaged", scanner.timeWaitingMs.get() < 10L); @@ -567,10 +563,11 @@ public class TestDirectoryScanner { scanner.timeRunningMs.get() > 0L); // Test15: validate clean shutdown of DirectoryScanner - ////assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not sim + //// assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not + // sim scanner.shutdown(); assertFalse(scanner.getRunStatus()); - + } finally { if (scanner != null) { scanner.shutdown(); @@ -582,17 +579,17 @@ public class TestDirectoryScanner { /** * Test that the timeslice throttle limits the report compiler thread's - * execution time correctly. We test by scanning a large block pool and + * execution time correctly. We test by scanning a large block pool and * comparing the time spent waiting to the time spent running. * - * The block pool has to be large, or the ratio will be off. The throttle - * allows the report compiler thread to finish its current cycle when - * blocking it, so the ratio will always be a little lower than expected. - * The smaller the block pool, the further off the ratio will be. + * The block pool has to be large, or the ratio will be off. The throttle + * allows the report compiler thread to finish its current cycle when blocking + * it, so the ratio will always be a little lower than expected. The smaller + * the block pool, the further off the ratio will be. * * @throws Exception thrown on unexpected failure */ - @Test (timeout=600000) + @Test(timeout = 600000) public void testThrottling() throws Exception { Configuration conf = new Configuration(CONF); @@ -611,10 +608,9 @@ public class TestDirectoryScanner { conf.setInt( DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, 100); - DataNode dataNode = cluster.getDataNodes().get(0); - final int maxBlocksPerFile = (int) DFSConfigKeys - .DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT; + final int maxBlocksPerFile = + (int) DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT; int numBlocksToCreate = blocks; while (numBlocksToCreate > 0) { final int toCreate = Math.min(maxBlocksPerFile, numBlocksToCreate); @@ -627,7 +623,7 @@ public class TestDirectoryScanner { int retries = maxRetries; while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) { - scanner = new DirectoryScanner(dataNode, fds, conf); + scanner = new DirectoryScanner(fds, conf); ratio = runThrottleTest(blocks); retries -= 1; } @@ -645,7 +641,7 @@ public class TestDirectoryScanner { retries = maxRetries; while ((retries > 0) && ((ratio < 2.75f) || (ratio > 4.5f))) { - scanner = new DirectoryScanner(dataNode, fds, conf); + scanner = new DirectoryScanner(fds, conf); ratio = runThrottleTest(blocks); retries -= 1; } @@ -664,7 +660,7 @@ public class TestDirectoryScanner { retries = maxRetries; while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) { - scanner = new DirectoryScanner(dataNode, fds, conf); + scanner = new DirectoryScanner(fds, conf); ratio = runThrottleTest(blocks); retries -= 1; } @@ -675,7 +671,7 @@ public class TestDirectoryScanner { assertTrue("Throttle is too permissive", ratio >= 7f); // Test with no limit - scanner = new DirectoryScanner(dataNode, fds, CONF); + scanner = new DirectoryScanner(fds, CONF); scanner.setRetainDiffs(true); scan(blocks, 0, 0, 0, 0, 0); scanner.shutdown(); @@ -686,7 +682,7 @@ public class TestDirectoryScanner { assertTrue("Report complier threads logged no execution time", scanner.timeRunningMs.get() > 0L); - // Test with a 1ms limit. This also tests whether the scanner can be + // Test with a 1ms limit. This also tests whether the scanner can be // shutdown cleanly in mid stride. conf.setInt( DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, @@ -698,7 +694,7 @@ public class TestDirectoryScanner { try { while ((retries > 0) && (ratio < 10)) { - scanner = new DirectoryScanner(dataNode, fds, conf); + scanner = new DirectoryScanner(fds, conf); scanner.setRetainDiffs(true); final AtomicLong nowMs = new AtomicLong(); @@ -728,7 +724,7 @@ public class TestDirectoryScanner { } ratio = - (float)scanner.timeWaitingMs.get() / scanner.timeRunningMs.get(); + (float) scanner.timeWaitingMs.get() / scanner.timeRunningMs.get(); retries -= 1; } } finally { @@ -737,8 +733,7 @@ public class TestDirectoryScanner { // We just want to test that it waits a lot, but it also runs some LOG.info("RATIO: " + ratio); - assertTrue("Throttle is too permissive", - ratio > 10); + assertTrue("Throttle is too permissive", ratio > 8); assertTrue("Report complier threads logged no execution time", scanner.timeRunningMs.get() > 0L); @@ -746,7 +741,7 @@ public class TestDirectoryScanner { conf.setInt( DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, 0); - scanner = new DirectoryScanner(dataNode, fds, conf); + scanner = new DirectoryScanner(fds, conf); scanner.setRetainDiffs(true); scan(blocks, 0, 0, 0, 0, 0); scanner.shutdown(); @@ -761,7 +756,7 @@ public class TestDirectoryScanner { conf.setInt( DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, 1000); - scanner = new DirectoryScanner(dataNode, fds, conf); + scanner = new DirectoryScanner(fds, conf); scanner.setRetainDiffs(true); scan(blocks, 0, 0, 0, 0, 0); scanner.shutdown(); @@ -777,9 +772,8 @@ public class TestDirectoryScanner { conf.setInt( DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, 10); - conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, - 1); - scanner = new DirectoryScanner(dataNode, fds, conf); + conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); + scanner = new DirectoryScanner(fds, conf); scanner.setRetainDiffs(true); scanner.start(); @@ -805,7 +799,7 @@ public class TestDirectoryScanner { scanner.shutdown(); assertFalse(scanner.getRunStatus()); - return (float)scanner.timeWaitingMs.get() / scanner.timeRunningMs.get(); + return (float) scanner.timeWaitingMs.get() / scanner.timeRunningMs.get(); } private void verifyAddition(long blockId, long genStamp, long size) { @@ -836,7 +830,7 @@ public class TestDirectoryScanner { assertNotNull(memBlock); assertEquals(genStamp, memBlock.getGenerationStamp()); } - + private void verifyStorageType(long blockId, boolean expectTransient) { final ReplicaInfo memBlock; memBlock = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId); @@ -859,7 +853,7 @@ public class TestDirectoryScanner { public long getAvailable() throws IOException { return 0; } - + public File getFinalizedDir(String bpid) throws IOException { return new File("/base/current/" + bpid + "/finalized"); } @@ -898,10 +892,11 @@ public class TestDirectoryScanner { @Override public BlockIterator loadBlockIterator(String bpid, String name) - throws IOException { + throws IOException { throw new UnsupportedOperationException(); } + @SuppressWarnings("rawtypes") @Override public FsDatasetSpi getDataset() { throw new UnsupportedOperationException(); @@ -923,8 +918,8 @@ public class TestDirectoryScanner { } @Override - public byte[] loadLastPartialChunkChecksum( - File blockFile, File metaFile) throws IOException { + public byte[] loadLastPartialChunkChecksum(File blockFile, File metaFile) + throws IOException { return null; } @@ -945,7 +940,6 @@ public class TestDirectoryScanner { return null; } - @Override public VolumeCheckResult check(VolumeCheckContext context) throws Exception { @@ -954,11 +948,11 @@ public class TestDirectoryScanner { } private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi(); - + private final static String BPID_1 = "BP-783049782-127.0.0.1-1370971773491"; - + private final static String BPID_2 = "BP-367845636-127.0.0.1-5895645674231"; - + void testScanInfoObject(long blockId, File blockFile, File metaFile) throws Exception { FsVolumeSpi.ScanInfo scanInfo = @@ -978,7 +972,7 @@ public class TestDirectoryScanner { } assertEquals(TEST_VOLUME, scanInfo.getVolume()); } - + void testScanInfoObject(long blockId) throws Exception { FsVolumeSpi.ScanInfo scanInfo = new FsVolumeSpi.ScanInfo(blockId, null, null, null); @@ -987,7 +981,7 @@ public class TestDirectoryScanner { assertNull(scanInfo.getMetaFile()); } - @Test(timeout=120000) + @Test(timeout = 120000) public void TestScanInfo() throws Exception { testScanInfoObject(123, new File(TEST_VOLUME.getFinalizedDir(BPID_1).getAbsolutePath(), @@ -998,13 +992,10 @@ public class TestDirectoryScanner { new File(TEST_VOLUME.getFinalizedDir(BPID_1).getAbsolutePath(), "blk_123"), null); - testScanInfoObject(523, - null, + testScanInfoObject(523, null, new File(TEST_VOLUME.getFinalizedDir(BPID_1).getAbsolutePath(), "blk_123__1009.meta")); - testScanInfoObject(789, - null, - null); + testScanInfoObject(789, null, null); testScanInfoObject(456); testScanInfoObject(123, new File(TEST_VOLUME.getFinalizedDir(BPID_2).getAbsolutePath(), @@ -1027,7 +1018,6 @@ public class TestDirectoryScanner { fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); client = cluster.getFileSystem().getClient(); CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1); - DataNode dataNode = cluster.getDataNodes().get(0); // Add files with 2 blocks createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 2, false); @@ -1047,7 +1037,7 @@ public class TestDirectoryScanner { FsDatasetSpi<? extends FsVolumeSpi> spyFds = Mockito.spy(fds); Mockito.doReturn(volReferences).when(spyFds).getFsVolumeReferences(); - scanner = new DirectoryScanner(dataNode, spyFds, CONF); + scanner = new DirectoryScanner(spyFds, CONF); scanner.setRetainDiffs(true); scanner.reconcile(); } finally { @@ -1061,28 +1051,27 @@ public class TestDirectoryScanner { @Test public void testDirectoryScannerInFederatedCluster() throws Exception { - //Create Federated cluster with two nameservices and one DN + // Create Federated cluster with two nameservices and one DN try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF) .nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2)) .numDataNodes(1).build()) { cluster.waitActive(); cluster.transitionToActive(1); cluster.transitionToActive(3); - DataNode dataNode = cluster.getDataNodes().get(0); fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); - //Create one block in first nameservice + // Create one block in first nameservice FileSystem fs = cluster.getFileSystem(1); int bp1Files = 1; writeFile(fs, bp1Files); - //Create two blocks in second nameservice + // Create two blocks in second nameservice FileSystem fs2 = cluster.getFileSystem(3); int bp2Files = 2; writeFile(fs2, bp2Files); - //Call the Directory scanner - scanner = new DirectoryScanner(dataNode, fds, CONF); + // Call the Directory scanner + scanner = new DirectoryScanner(fds, CONF); scanner.setRetainDiffs(true); scanner.reconcile(); - //Check blocks in corresponding BP + // Check blocks in corresponding BP GenericTestUtils.waitFor(() -> { try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java index 422acc3..a48e2f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java @@ -34,6 +34,7 @@ import java.io.Writer; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.server.datanode.DNConf; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner; +import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfoVolumeReport; import org.apache.hadoop.hdfs.server.datanode.FinalizedProvidedReplica; import org.apache.hadoop.hdfs.server.datanode.ProvidedReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; @@ -73,9 +75,9 @@ import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.TestProvidedReplicaImpl; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.StringUtils; @@ -183,7 +185,6 @@ public class TestProvidedImpl { public static class TestFileRegionBlockAliasMap extends BlockAliasMap<FileRegion> { - private Configuration conf; private int minId; private int numBlocks; private Iterator<FileRegion> suppliedIterator; @@ -592,11 +593,13 @@ public class TestProvidedImpl { @Test public void testScannerWithProvidedVolumes() throws Exception { - DirectoryScanner scanner = new DirectoryScanner(datanode, dataset, conf); - Map<String, FsVolumeSpi.ScanInfo[]> report = scanner.getDiskReport(); + DirectoryScanner scanner = new DirectoryScanner(dataset, conf); + Collection<ScanInfoVolumeReport> reports = scanner.getVolumeReports(); // no blocks should be reported for the Provided volume as long as // the directoryScanner is disabled. - assertEquals(0, report.get(BLOCK_POOL_IDS[CHOSEN_BP_ID]).length); + for (ScanInfoVolumeReport report : reports) { + assertEquals(0, report.getScanInfo(BLOCK_POOL_IDS[CHOSEN_BP_ID]).size()); + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java index e1c8ae3..db12146 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.File; @@ -63,14 +64,19 @@ public class TestListCorruptFileBlocks { @Test (timeout=300000) public void testListCorruptFilesCorruptedBlock() throws Exception { MiniDFSCluster cluster = null; - Random random = new Random(); - + try { Configuration conf = new HdfsConfiguration(); - conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); // datanode scans directories - conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 3 * 1000); // datanode sends block reports + + // datanode scans directories + conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); + + // datanode sends block reports + conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 3 * 1000); + // Set short retry timeouts so this test runs faster conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); + cluster = new MiniDFSCluster.Builder(conf).build(); FileSystem fs = cluster.getFileSystem(); @@ -84,8 +90,8 @@ public class TestListCorruptFileBlocks { final NameNode namenode = cluster.getNameNode(); Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = namenode. getNamesystem().listCorruptFileBlocks("/", null); - assertTrue("Namenode has " + badFiles.size() - + " corrupt files. Expecting None.", badFiles.size() == 0); + assertEquals("Namenode has " + badFiles.size() + + " corrupt files. Expecting None.", 0, badFiles.size()); // Now deliberately corrupt one block String bpid = cluster.getNamesystem().getBlockPoolId(); @@ -101,7 +107,7 @@ public class TestListCorruptFileBlocks { long position = channel.size() - 2; int length = 2; byte[] buffer = new byte[length]; - random.nextBytes(buffer); + new Random(13L).nextBytes(buffer); channel.write(ByteBuffer.wrap(buffer), position); file.close(); LOG.info("Deliberately corrupting file " + metaFile.getName() + @@ -134,7 +140,6 @@ public class TestListCorruptFileBlocks { @Test (timeout=300000) public void testListCorruptFileBlocksInSafeMode() throws Exception { MiniDFSCluster cluster = null; - Random random = new Random(); try { Configuration conf = new HdfsConfiguration(); @@ -164,8 +169,8 @@ public class TestListCorruptFileBlocks { // fetch bad file list from namenode. There should be none. Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = cluster.getNameNode().getNamesystem().listCorruptFileBlocks("/", null); - assertTrue("Namenode has " + badFiles.size() - + " corrupt files. Expecting None.", badFiles.size() == 0); + assertEquals("Namenode has " + badFiles.size() + + " corrupt files. Expecting None.", 0, badFiles.size()); // Now deliberately corrupt one block File storageDir = cluster.getInstanceStorageDir(0, 0); @@ -181,7 +186,7 @@ public class TestListCorruptFileBlocks { long position = channel.size() - 2; int length = 2; byte[] buffer = new byte[length]; - random.nextBytes(buffer); + new Random(13L).nextBytes(buffer); channel.write(ByteBuffer.wrap(buffer), position); file.close(); LOG.info("Deliberately corrupting file " + metaFile.getName() + @@ -318,9 +323,9 @@ public class TestListCorruptFileBlocks { } // Validate we get all the corrupt files LOG.info("Namenode has bad files. " + numCorrupt); - assertTrue(numCorrupt == 3); - // test the paging here + assertEquals(3, numCorrupt); + // test the paging here FSNamesystem.CorruptFileBlockInfo[] cfb = corruptFileBlocks .toArray(new FSNamesystem.CorruptFileBlockInfo[0]); // now get the 2nd and 3rd file that is corrupt @@ -331,7 +336,7 @@ public class TestListCorruptFileBlocks { FSNamesystem.CorruptFileBlockInfo[] ncfb = nextCorruptFileBlocks .toArray(new FSNamesystem.CorruptFileBlockInfo[0]); numCorrupt = nextCorruptFileBlocks.size(); - assertTrue(numCorrupt == 2); + assertEquals(2, numCorrupt); assertTrue(ncfb[0].block.getBlockName() .equalsIgnoreCase(cfb[1].block.getBlockName())); @@ -339,14 +344,14 @@ public class TestListCorruptFileBlocks { namenode.getNamesystem() .listCorruptFileBlocks("/corruptData", cookie); numCorrupt = corruptFileBlocks.size(); - assertTrue(numCorrupt == 0); + assertEquals(0, numCorrupt); // Do a listing on a dir which doesn't have any corrupt blocks and // validate util.createFiles(fs, "/goodData"); corruptFileBlocks = namenode.getNamesystem().listCorruptFileBlocks("/goodData", null); numCorrupt = corruptFileBlocks.size(); - assertTrue(numCorrupt == 0); + assertEquals(0, numCorrupt); util.cleanup(fs, "/corruptData"); util.cleanup(fs, "/goodData"); } finally { @@ -390,7 +395,7 @@ public class TestListCorruptFileBlocks { RemoteIterator<Path> corruptFileBlocks = dfs.listCorruptFileBlocks(new Path("/corruptData")); int numCorrupt = countPaths(corruptFileBlocks); - assertTrue(numCorrupt == 0); + assertEquals(0, numCorrupt); // delete the blocks String bpid = cluster.getNamesystem().getBlockPoolId(); // For loop through number of datadirectories per datanode (2) @@ -426,7 +431,7 @@ public class TestListCorruptFileBlocks { } // Validate we get all the corrupt files LOG.info("Namenode has bad files. " + numCorrupt); - assertTrue(numCorrupt == 3); + assertEquals(3, numCorrupt); util.cleanup(fs, "/corruptData"); util.cleanup(fs, "/goodData"); @@ -465,8 +470,9 @@ public class TestListCorruptFileBlocks { final NameNode namenode = cluster.getNameNode(); Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = namenode. getNamesystem().listCorruptFileBlocks("/srcdat2", null); - assertTrue("Namenode has " + badFiles.size() + " corrupt files. Expecting none.", - badFiles.size() == 0); + assertEquals( + "Namenode has " + badFiles.size() + " corrupt files. Expecting none.", + 0, badFiles.size()); // Now deliberately blocks from all files final String bpid = cluster.getNamesystem().getBlockPoolId(); @@ -555,7 +561,7 @@ public class TestListCorruptFileBlocks { RemoteIterator<Path> corruptFileBlocks = dfs .listCorruptFileBlocks(new Path("corruptData")); int numCorrupt = countPaths(corruptFileBlocks); - assertTrue(numCorrupt == 0); + assertEquals(0, numCorrupt); // delete the blocks String bpid = cluster.getNamesystem().getBlockPoolId(); @@ -589,7 +595,7 @@ public class TestListCorruptFileBlocks { } // Validate we get all the corrupt files LOG.info("Namenode has bad files. " + numCorrupt); - assertTrue("Failed to get corrupt files!", numCorrupt == 3); + assertEquals("Failed to get corrupt files!", 3, numCorrupt); util.cleanup(fs, "corruptData"); } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org