HDFS-12291: [SPS]: Provide a mechanism to recursively iterate and satisfy storage policy of all the files under the given dir. Contributed by Surendra Singh Lilhore.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e94f1deb Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e94f1deb Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e94f1deb Branch: refs/heads/HDFS-10285 Commit: e94f1deb35c19c93fa8cf3dd3721860bca4b0310 Parents: 8d29166 Author: Uma Maheswara Rao G <[email protected]> Authored: Sat Sep 30 06:31:52 2017 -0700 Committer: Rakesh Radhakrishnan <[email protected]> Committed: Mon Jan 29 09:21:08 2018 +0530 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 + .../java/org/apache/hadoop/hdfs/DFSUtil.java | 22 +- .../BlockStorageMovementAttemptedItems.java | 8 +- .../namenode/BlockStorageMovementNeeded.java | 277 +++++++-- .../hdfs/server/namenode/FSTreeTraverser.java | 313 ++++++++++ .../server/namenode/ReencryptionHandler.java | 618 ++++++++----------- .../server/namenode/ReencryptionUpdater.java | 2 +- .../server/namenode/StoragePolicySatisfier.java | 43 +- .../src/main/resources/hdfs-default.xml | 23 + .../src/site/markdown/ArchivalStorage.md | 3 +- .../TestBlockStorageMovementAttemptedItems.java | 2 +- .../TestPersistentStoragePolicySatisfier.java | 8 +- .../hdfs/server/namenode/TestReencryption.java | 3 - .../namenode/TestReencryptionHandler.java | 10 +- .../namenode/TestStoragePolicySatisfier.java | 377 ++++++++++- 15 files changed, 1260 insertions(+), 457 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 8c4fa69..c435739 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -602,6 +602,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.storage.policy.satisfier.enabled"; public static final boolean DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT = false; + public static final String DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY = + "dfs.storage.policy.satisfier.queue.limit"; + public static final int DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT = + 1000; + public static final String DFS_SPS_WORK_MULTIPLIER_PER_ITERATION = + "dfs.storage.policy.satisfier.work.multiplier.per.iteration"; + public static final int DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT = + 1; public static final String DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY = "dfs.storage.policy.satisfier.recheck.timeout.millis"; public static final int DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT = http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 7465853..570b85d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -1422,7 +1422,27 @@ public class DFSUtil { "It should be a positive, non-zero integer value."); return blocksReplWorkMultiplier; } - + + /** + * Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from + * configuration. + * + * @param conf Configuration + * @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION + */ + public static int getSPSWorkMultiplier(Configuration conf) { + int spsWorkMultiplier = conf + .getInt( + DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION, + DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT); + Preconditions.checkArgument( + (spsWorkMultiplier > 0), + DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION + + " = '" + spsWorkMultiplier + "' is invalid. " + + "It should be a positive, non-zero integer value."); + return spsWorkMultiplier; + } + /** * Get SPNEGO keytab Key from configuration * http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java index 278b62b..549819f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java @@ -101,7 +101,7 @@ public class BlockStorageMovementAttemptedItems { public void add(ItemInfo itemInfo, boolean allBlockLocsAttemptedToSatisfy) { synchronized (storageMovementAttemptedItems) { AttemptedItemInfo attemptedItemInfo = new AttemptedItemInfo( - itemInfo.getRootId(), itemInfo.getTrackId(), monotonicNow(), + itemInfo.getStartId(), itemInfo.getTrackId(), monotonicNow(), allBlockLocsAttemptedToSatisfy); storageMovementAttemptedItems.put(itemInfo.getTrackId(), attemptedItemInfo); @@ -260,7 +260,7 @@ public class BlockStorageMovementAttemptedItems { synchronized (storageMovementAttemptedResults) { if (!isExistInResult(blockCollectionID)) { ItemInfo candidate = new ItemInfo( - itemInfo.getRootId(), blockCollectionID); + itemInfo.getStartId(), blockCollectionID); blockStorageMovementNeeded.add(candidate); iter.remove(); LOG.info("TrackID: {} becomes timed out and moved to needed " @@ -315,7 +315,7 @@ public class BlockStorageMovementAttemptedItems { // blockStorageMovementNeeded#removeIteamTrackInfo() for cleaning // the xAttr ItemInfo itemInfo = new ItemInfo((attemptedItemInfo != null) - ? attemptedItemInfo.getRootId() : trackId, trackId); + ? attemptedItemInfo.getStartId() : trackId, trackId); switch (status) { case FAILURE: if (attemptedItemInfo != null) { @@ -345,7 +345,7 @@ public class BlockStorageMovementAttemptedItems { if (attemptedItemInfo != null) { if (!attemptedItemInfo.isAllBlockLocsAttemptedToSatisfy()) { blockStorageMovementNeeded - .add(new ItemInfo(attemptedItemInfo.getRootId(), trackId)); + .add(new ItemInfo(attemptedItemInfo.getStartId(), trackId)); LOG.warn("{} But adding trackID back to retry queue as some of" + " the blocks couldn't find matching target nodes in" + " previous SPS iteration.", msg); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java index 41a3a6c..788a98b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java @@ -29,12 +29,15 @@ import java.util.Map; import java.util.Queue; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo; import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo; -import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * A Class to track the block collection IDs (Inode's ID) for which physical * storage movement needed as per the Namespace and StorageReports from DN. @@ -53,11 +56,11 @@ public class BlockStorageMovementNeeded { new LinkedList<ItemInfo>(); /** - * Map of rootId and number of child's. Number of child's indicate the number - * of files pending to satisfy the policy. + * Map of startId and number of child's. Number of child's indicate the + * number of files pending to satisfy the policy. */ - private final Map<Long, Integer> pendingWorkForDirectory = - new HashMap<Long, Integer>(); + private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory = + new HashMap<Long, DirPendingWorkInfo>(); private final Namesystem namesystem; @@ -66,12 +69,15 @@ public class BlockStorageMovementNeeded { private final StoragePolicySatisfier sps; - private Daemon fileInodeIdCollector; + private Daemon inodeIdCollector; + + private final int maxQueuedItem; public BlockStorageMovementNeeded(Namesystem namesystem, - StoragePolicySatisfier sps) { + StoragePolicySatisfier sps, int queueLimit) { this.namesystem = namesystem; this.sps = sps; + this.maxQueuedItem = queueLimit; } /** @@ -88,15 +94,24 @@ public class BlockStorageMovementNeeded { /** * Add the itemInfo to tracking list for which storage movement * expected if necessary. - * @param rootId - * - root inode id + * @param startId + * - start id * @param itemInfoList * - List of child in the directory */ - private synchronized void addAll(Long rootId, - List<ItemInfo> itemInfoList) { + @VisibleForTesting + public synchronized void addAll(long startId, + List<ItemInfo> itemInfoList, boolean scanCompleted) { storageMovementNeeded.addAll(itemInfoList); - pendingWorkForDirectory.put(rootId, itemInfoList.size()); + DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId); + if (pendingWork == null) { + pendingWork = new DirPendingWorkInfo(); + pendingWorkForDirectory.put(startId, pendingWork); + } + pendingWork.addPendingWorkCount(itemInfoList.size()); + if (scanCompleted) { + pendingWork.markScanCompleted(); + } } /** @@ -118,6 +133,25 @@ public class BlockStorageMovementNeeded { } } + /** + * Returns queue remaining capacity. + */ + public synchronized int remainingCapacity() { + int size = storageMovementNeeded.size(); + if (size >= maxQueuedItem) { + return 0; + } else { + return (maxQueuedItem - size); + } + } + + /** + * Returns queue size. + */ + public synchronized int size() { + return storageMovementNeeded.size(); + } + public synchronized void clearAll() { spsDirsToBeTraveresed.clear(); storageMovementNeeded.clear(); @@ -131,20 +165,20 @@ public class BlockStorageMovementNeeded { public synchronized void removeItemTrackInfo(ItemInfo trackInfo) throws IOException { if (trackInfo.isDir()) { - // If track is part of some root then reduce the pending directory work - // count. - long rootId = trackInfo.getRootId(); - INode inode = namesystem.getFSDirectory().getInode(rootId); + // If track is part of some start inode then reduce the pending + // directory work count. + long startId = trackInfo.getStartId(); + INode inode = namesystem.getFSDirectory().getInode(startId); if (inode == null) { // directory deleted just remove it. - this.pendingWorkForDirectory.remove(rootId); + this.pendingWorkForDirectory.remove(startId); } else { - if (pendingWorkForDirectory.get(rootId) != null) { - Integer pendingWork = pendingWorkForDirectory.get(rootId) - 1; - pendingWorkForDirectory.put(rootId, pendingWork); - if (pendingWork <= 0) { - namesystem.removeXattr(rootId, XATTR_SATISFY_STORAGE_POLICY); - pendingWorkForDirectory.remove(rootId); + DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId); + if (pendingWork != null) { + pendingWork.decrementPendingWorkCount(); + if (pendingWork.isDirWorkDone()) { + namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY); + pendingWorkForDirectory.remove(startId); } } } @@ -161,7 +195,7 @@ public class BlockStorageMovementNeeded { Iterator<ItemInfo> iterator = storageMovementNeeded.iterator(); while (iterator.hasNext()) { ItemInfo next = iterator.next(); - if (next.getRootId() == trackId) { + if (next.getStartId() == trackId) { iterator.remove(); } } @@ -208,7 +242,17 @@ public class BlockStorageMovementNeeded { * Take dir tack ID from the spsDirsToBeTraveresed queue and collect child * ID's to process for satisfy the policy. */ - private class FileInodeIdCollector implements Runnable { + private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser + implements Runnable { + + private int remainingCapacity = 0; + + private List<ItemInfo> currentBatch = new ArrayList<>(maxQueuedItem); + + StorageMovementPendingInodeIdCollector(FSDirectory dir) { + super(dir); + } + @Override public void run() { LOG.info("Starting FileInodeIdCollector!."); @@ -216,38 +260,36 @@ public class BlockStorageMovementNeeded { try { if (!namesystem.isInSafeMode()) { FSDirectory fsd = namesystem.getFSDirectory(); - Long rootINodeId = spsDirsToBeTraveresed.poll(); - if (rootINodeId == null) { + Long startINodeId = spsDirsToBeTraveresed.poll(); + if (startINodeId == null) { // Waiting for SPS path synchronized (spsDirsToBeTraveresed) { spsDirsToBeTraveresed.wait(5000); } } else { - INode rootInode = fsd.getInode(rootINodeId); - if (rootInode != null) { - // TODO : HDFS-12291 - // 1. Implement an efficient recursive directory iteration - // mechanism and satisfies storage policy for all the files - // under the given directory. - // 2. Process files in batches,so datanodes workload can be - // handled. - List<ItemInfo> itemInfoList = - new ArrayList<>(); - for (INode childInode : rootInode.asDirectory() - .getChildrenList(Snapshot.CURRENT_STATE_ID)) { - if (childInode.isFile() - && childInode.asFile().numBlocks() != 0) { - itemInfoList.add( - new ItemInfo(rootINodeId, childInode.getId())); - } + INode startInode = fsd.getInode(startINodeId); + if (startInode != null) { + try { + remainingCapacity = remainingCapacity(); + readLock(); + traverseDir(startInode.asDirectory(), startINodeId, + HdfsFileStatus.EMPTY_NAME, + new SPSTraverseInfo(startINodeId)); + } finally { + readUnlock(); } - if (itemInfoList.isEmpty()) { - // satisfy track info is empty, so remove the xAttr from the - // directory - namesystem.removeXattr(rootINodeId, + // Mark startInode traverse is done + addAll(startInode.getId(), currentBatch, true); + currentBatch.clear(); + + // check if directory was empty and no child added to queue + DirPendingWorkInfo dirPendingWorkInfo = + pendingWorkForDirectory.get(startInode.getId()); + if (dirPendingWorkInfo.isDirWorkDone()) { + namesystem.removeXattr(startInode.getId(), XATTR_SATISFY_STORAGE_POLICY); + pendingWorkForDirectory.remove(startInode.getId()); } - addAll(rootINodeId, itemInfoList); } } } @@ -256,17 +298,140 @@ public class BlockStorageMovementNeeded { } } } + + @Override + protected void checkPauseForTesting() throws InterruptedException { + // TODO implement if needed + } + + @Override + protected boolean processFileInode(INode inode, TraverseInfo traverseInfo) + throws IOException, InterruptedException { + assert getFSDirectory().hasReadLock(); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing {} for statisy the policy", + inode.getFullPathName()); + } + if (!inode.isFile()) { + return false; + } + if (inode.isFile() && inode.asFile().numBlocks() != 0) { + currentBatch.add(new ItemInfo( + ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId())); + remainingCapacity--; + } + return true; + } + + @Override + protected boolean canSubmitCurrentBatch() { + return remainingCapacity <= 0; + } + + @Override + protected void checkINodeReady(long startId) throws IOException { + FSNamesystem fsn = ((FSNamesystem) namesystem); + fsn.checkNameNodeSafeMode("NN is in safe mode," + + "cannot satisfy the policy."); + // SPS work should be cancelled when NN goes to standby. Just + // double checking for sanity. + fsn.checkOperation(NameNode.OperationCategory.WRITE); + } + + @Override + protected void submitCurrentBatch(long startId) + throws IOException, InterruptedException { + // Add current child's to queue + addAll(startId, currentBatch, false); + currentBatch.clear(); + } + + @Override + protected void throttle() throws InterruptedException { + assert !getFSDirectory().hasReadLock(); + assert !namesystem.hasReadLock(); + if (LOG.isDebugEnabled()) { + LOG.debug("StorageMovementNeeded queue remaining capacity is zero," + + " waiting for some free slots."); + } + remainingCapacity = remainingCapacity(); + // wait for queue to be free + while (remainingCapacity <= 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting for storageMovementNeeded queue to be free!"); + } + Thread.sleep(5000); + remainingCapacity = remainingCapacity(); + } + } + + @Override + protected boolean canTraverseDir(INode inode) throws IOException { + return true; + } } - public void start() { - fileInodeIdCollector = new Daemon(new FileInodeIdCollector()); - fileInodeIdCollector.setName("FileInodeIdCollector"); - fileInodeIdCollector.start(); + /** + * Info for directory recursive scan. + */ + public static class DirPendingWorkInfo { + + private int pendingWorkCount = 0; + private boolean fullyScanned = false; + + /** + * Increment the pending work count for directory. + */ + public synchronized void addPendingWorkCount(int count) { + this.pendingWorkCount = this.pendingWorkCount + count; + } + + /** + * Decrement the pending work count for directory one track info is + * completed. + */ + public synchronized void decrementPendingWorkCount() { + this.pendingWorkCount--; + } + + /** + * Return true if all the pending work is done and directory fully + * scanned, otherwise false. + */ + public synchronized boolean isDirWorkDone() { + return (pendingWorkCount <= 0 && fullyScanned); + } + + /** + * Mark directory scan is completed. + */ + public synchronized void markScanCompleted() { + this.fullyScanned = true; + } } - public void stop() { - if (fileInodeIdCollector != null) { - fileInodeIdCollector.interrupt(); + public void init() { + inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector( + namesystem.getFSDirectory())); + inodeIdCollector.setName("FileInodeIdCollector"); + inodeIdCollector.start(); + } + + public void close() { + if (inodeIdCollector != null) { + inodeIdCollector.interrupt(); + } + } + + class SPSTraverseInfo extends TraverseInfo { + private long startId; + + SPSTraverseInfo(long startId) { + this.startId = startId; + } + + public long getStartId() { + return startId; } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java new file mode 100644 index 0000000..acc23e5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; +import org.apache.hadoop.hdfs.util.ReadOnlyList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * FSTreeTraverser traverse directory recursively and process files + * in batches. + */ [email protected] +public abstract class FSTreeTraverser { + + public static final Logger LOG = LoggerFactory + .getLogger(FSTreeTraverser.class); + + private FSDirectory dir; + + public FSTreeTraverser(FSDirectory dir) { + this.dir = dir; + } + + public FSDirectory getFSDirectory() { + return dir; + } + + /** + * Iterate through all files directly inside parent, and recurse down + * directories. The listing is done in batch, and can optionally start after + * a position. The iteration of the inode tree is done in a depth-first + * fashion. But instead of holding all {@link INodeDirectory}'s in memory + * on the fly, only the path components to the current inode is held. This + * is to reduce memory consumption. + * + * @param parent + * The inode id of parent directory + * @param startId + * Id of the start inode. + * @param startAfter + * Full path of a file the traverse should start after. + * @param traverseInfo + * info which may required for processing the child's. + * @throws IOException + * @throws InterruptedException + */ + protected void traverseDir(final INodeDirectory parent, final long startId, + byte[] startAfter, final TraverseInfo traverseInfo) + throws IOException, InterruptedException { + List<byte[]> startAfters = new ArrayList<>(); + if (parent == null) { + return; + } + INode curr = parent; + // construct startAfters all the way up to the zone inode. + startAfters.add(startAfter); + while (curr.getId() != startId) { + startAfters.add(0, curr.getLocalNameBytes()); + curr = curr.getParent(); + } + curr = traverseDirInt(startId, parent, startAfters, traverseInfo); + while (!startAfters.isEmpty()) { + if (curr == null) { + // lock was reacquired, re-resolve path. + curr = resolvePaths(startId, startAfters); + } + curr = traverseDirInt(startId, curr, startAfters, traverseInfo); + } + } + + /** + * Iterates the parent directory, and add direct children files to current + * batch. If batch size meets configured threshold, current batch will be + * submitted for the processing. + * <p> + * Locks could be released and reacquired when a batch submission is + * finished. + * + * @param startId + * Id of the start inode. + * @return The inode which was just processed, if lock is held in the entire + * process. Null if lock is released. + * @throws IOException + * @throws InterruptedException + */ + protected INode traverseDirInt(final long startId, INode curr, + List<byte[]> startAfters, final TraverseInfo traverseInfo) + throws IOException, InterruptedException { + assert dir.hasReadLock(); + assert dir.getFSNamesystem().hasReadLock(); + Preconditions.checkNotNull(curr, "Current inode can't be null"); + checkINodeReady(startId); + final INodeDirectory parent = curr.isDirectory() ? curr.asDirectory() + : curr.getParent(); + ReadOnlyList<INode> children = parent + .getChildrenList(Snapshot.CURRENT_STATE_ID); + if (LOG.isDebugEnabled()) { + LOG.debug("Traversing directory {}", parent.getFullPathName()); + } + + final byte[] startAfter = startAfters.get(startAfters.size() - 1); + boolean lockReleased = false; + for (int i = INodeDirectory.nextChild(children, startAfter); i < children + .size(); ++i) { + final INode inode = children.get(i); + if (!processFileInode(inode, traverseInfo)) { + // inode wasn't processes. Recurse down if it's a dir, + // skip otherwise. + if (!inode.isDirectory()) { + continue; + } + + if (!canTraverseDir(inode)) { + continue; + } + // add 1 level to the depth-first search. + curr = inode; + if (!startAfters.isEmpty()) { + startAfters.remove(startAfters.size() - 1); + startAfters.add(curr.getLocalNameBytes()); + } + startAfters.add(HdfsFileStatus.EMPTY_NAME); + return lockReleased ? null : curr; + } + if (canSubmitCurrentBatch()) { + final byte[] currentStartAfter = inode.getLocalNameBytes(); + final String parentPath = parent.getFullPathName(); + lockReleased = true; + readUnlock(); + submitCurrentBatch(startId); + try { + throttle(); + checkPauseForTesting(); + } finally { + readLock(); + } + checkINodeReady(startId); + + // Things could have changed when the lock was released. + // Re-resolve the parent inode. + FSPermissionChecker pc = dir.getPermissionChecker(); + INode newParent = dir + .resolvePath(pc, parentPath, FSDirectory.DirOp.READ) + .getLastINode(); + if (newParent == null || !newParent.equals(parent)) { + // parent dir is deleted or recreated. We're done. + return null; + } + children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID); + // -1 to counter the ++ on the for loop + i = INodeDirectory.nextChild(children, currentStartAfter) - 1; + } + } + // Successfully finished this dir, adjust pointers to 1 level up, and + // startAfter this dir. + startAfters.remove(startAfters.size() - 1); + if (!startAfters.isEmpty()) { + startAfters.remove(startAfters.size() - 1); + startAfters.add(curr.getLocalNameBytes()); + } + curr = curr.getParent(); + return lockReleased ? null : curr; + } + + /** + * Resolve the cursor of traverse to an inode. + * <p> + * The parent of the lowest level startAfter is returned. If somewhere in the + * middle of startAfters changed, the parent of the lowest unchanged level is + * returned. + * + * @param startId + * Id of the start inode. + * @param startAfters + * the cursor, represented by a list of path bytes. + * @return the parent inode corresponding to the startAfters, or null if the + * furthest parent is deleted. + */ + private INode resolvePaths(final long startId, List<byte[]> startAfters) + throws IOException { + // If the readlock was reacquired, we need to resolve the paths again + // in case things have changed. If our cursor file/dir is changed, + // continue from the next one. + INode zoneNode = dir.getInode(startId); + if (zoneNode == null) { + throw new FileNotFoundException("Zone " + startId + " is deleted."); + } + INodeDirectory parent = zoneNode.asDirectory(); + for (int i = 0; i < startAfters.size(); ++i) { + if (i == startAfters.size() - 1) { + // last startAfter does not need to be resolved, since search for + // nextChild will cover that automatically. + break; + } + INode curr = parent.getChild(startAfters.get(i), + Snapshot.CURRENT_STATE_ID); + if (curr == null) { + // inode at this level has changed. Update startAfters to point to + // the next dir at the parent level (and dropping any startAfters + // at lower levels). + for (; i < startAfters.size(); ++i) { + startAfters.remove(startAfters.size() - 1); + } + break; + } + parent = curr.asDirectory(); + } + return parent; + } + + protected void readLock() { + dir.getFSNamesystem().readLock(); + dir.readLock(); + } + + protected void readUnlock() { + dir.readUnlock(); + dir.getFSNamesystem().readUnlock("FSTreeTraverser"); + } + + + protected abstract void checkPauseForTesting() throws InterruptedException; + + /** + * Process an Inode. Add to current batch if it's a file, no-op otherwise. + * + * @param inode + * the inode + * @return true if inode is added to currentBatch and should be process for + * next operation. false otherwise: could be inode is not a file. + * @throws IOException + * @throws InterruptedException + */ + protected abstract boolean processFileInode(INode inode, + TraverseInfo traverseInfo) throws IOException, InterruptedException; + + /** + * Check whether current batch can be submitted for the processing. + * + * @return true if batch size meets meet the condition, otherwise false. + */ + protected abstract boolean canSubmitCurrentBatch(); + + /** + * Check whether inode is ready for traverse. Throws IOE if it's not. + * + * @param startId + * Id of the start inode. + * @throws IOException + */ + protected abstract void checkINodeReady(long startId) throws IOException; + + /** + * Submit the current batch for processing. + * + * @param startId + * Id of the start inode. + * @throws IOException + * @throws InterruptedException + */ + protected abstract void submitCurrentBatch(long startId) + throws IOException, InterruptedException; + + /** + * Throttles the FSTreeTraverser. + * + * @throws InterruptedException + */ + protected abstract void throttle() throws InterruptedException; + + /** + * Check whether dir is traversable or not. + * + * @param inode + * Dir inode + * @return true if dir is traversable otherwise false. + * @throws IOException + */ + protected abstract boolean canTraverseDir(INode inode) throws IOException; + + /** + * Class will represent the additional info required for traverse. + */ + public static class TraverseInfo { + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java index 01c2038..9b00519 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; @@ -30,18 +31,16 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.ReencryptionStatus; import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State; -import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; +import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo; import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo; import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ReencryptionTask; import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ZoneSubmissionTracker; -import org.apache.hadoop.hdfs.util.ReadOnlyList; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; import java.io.IOException; import java.security.GeneralSecurityException; import java.util.ArrayList; @@ -118,6 +117,8 @@ public class ReencryptionHandler implements Runnable { // be single-threaded, see class javadoc for more details. private ReencryptionBatch currentBatch; + private ReencryptionPendingInodeIdCollector traverser; + private final ReencryptionUpdater reencryptionUpdater; private ExecutorService updaterExecutor; @@ -186,16 +187,6 @@ public class ReencryptionHandler implements Runnable { reencryptionUpdater.pauseForTestingAfterNthCheckpoint(zoneId, count); } - private synchronized void checkPauseForTesting() throws InterruptedException { - assert !dir.hasReadLock(); - assert !dir.getFSNamesystem().hasReadLock(); - while (shouldPauseForTesting) { - LOG.info("Sleeping in the re-encrypt handler for unit test."); - wait(); - LOG.info("Continuing re-encrypt handler after pausing."); - } - } - ReencryptionHandler(final EncryptionZoneManager ezMgr, final Configuration conf) { this.ezManager = ezMgr; @@ -256,6 +247,7 @@ public class ReencryptionHandler implements Runnable { reencryptionUpdater = new ReencryptionUpdater(dir, batchService, this, conf); currentBatch = new ReencryptionBatch(reencryptBatchSize); + traverser = new ReencryptionPendingInodeIdCollector(dir, this); } ReencryptionStatus getReencryptionStatus() { @@ -339,7 +331,7 @@ public class ReencryptionHandler implements Runnable { synchronized (this) { wait(interval); } - checkPauseForTesting(); + traverser.checkPauseForTesting(); } catch (InterruptedException ie) { LOG.info("Re-encrypt handler interrupted. Exiting"); Thread.currentThread().interrupt(); @@ -397,7 +389,7 @@ public class ReencryptionHandler implements Runnable { final INode zoneNode; final ZoneReencryptionStatus zs; - readLock(); + traverser.readLock(); try { zoneNode = dir.getInode(zoneId); // start re-encrypting the zone from the beginning @@ -419,18 +411,19 @@ public class ReencryptionHandler implements Runnable { zoneId); if (zs.getLastCheckpointFile() == null) { // new re-encryption - reencryptDir(zoneNode.asDirectory(), zoneId, HdfsFileStatus.EMPTY_NAME, - zs.getEzKeyVersionName()); + traverser.traverseDir(zoneNode.asDirectory(), zoneId, + HdfsFileStatus.EMPTY_NAME, + new ZoneTraverseInfo(zs.getEzKeyVersionName())); } else { // resuming from a past re-encryption restoreFromLastProcessedFile(zoneId, zs); } // save the last batch and mark complete - submitCurrentBatch(zoneId); + traverser.submitCurrentBatch(zoneId); LOG.info("Submission completed of zone {} for re-encryption.", zoneId); reencryptionUpdater.markZoneSubmissionDone(zoneId); } finally { - readUnlock(); + traverser.readUnlock(); } } @@ -479,131 +472,8 @@ public class ReencryptionHandler implements Runnable { dir.getINodesInPath(zs.getLastCheckpointFile(), FSDirectory.DirOp.READ); parent = lpfIIP.getLastINode().getParent(); startAfter = lpfIIP.getLastINode().getLocalNameBytes(); - reencryptDir(parent, zoneId, startAfter, zs.getEzKeyVersionName()); - } - - /** - * Iterate through all files directly inside parent, and recurse down - * directories. The listing is done in batch, and can optionally start after - * a position. - * <p> - * Each batch is then send to the threadpool, where KMS will be contacted and - * edek re-encrypted. {@link ReencryptionUpdater} handles the tasks completed - * from the threadpool. - * <p> - * The iteration of the inode tree is done in a depth-first fashion. But - * instead of holding all INodeDirectory's in memory on the fly, only the - * path components to the current inode is held. This is to reduce memory - * consumption. - * - * @param parent The inode id of parent directory - * @param zoneId Id of the EZ inode - * @param startAfter Full path of a file the re-encrypt should start after. - * @throws IOException - * @throws InterruptedException - */ - private void reencryptDir(final INodeDirectory parent, final long zoneId, - byte[] startAfter, final String ezKeyVerName) - throws IOException, InterruptedException { - List<byte[]> startAfters = new ArrayList<>(); - if (parent == null) { - return; - } - INode curr = parent; - // construct startAfters all the way up to the zone inode. - startAfters.add(startAfter); - while (curr.getId() != zoneId) { - startAfters.add(0, curr.getLocalNameBytes()); - curr = curr.getParent(); - } - curr = reencryptDirInt(zoneId, parent, startAfters, ezKeyVerName); - while (!startAfters.isEmpty()) { - if (curr == null) { - // lock was reacquired, re-resolve path. - curr = resolvePaths(zoneId, startAfters); - } - curr = reencryptDirInt(zoneId, curr, startAfters, ezKeyVerName); - } - } - - /** - * Resolve the cursor of re-encryption to an inode. - * <p> - * The parent of the lowest level startAfter is returned. If somewhere in the - * middle of startAfters changed, the parent of the lowest unchanged level is - * returned. - * - * @param zoneId Id of the EZ inode. - * @param startAfters the cursor, represented by a list of path bytes. - * @return the parent inode corresponding to the startAfters, or null if - * the EZ node (furthest parent) is deleted. - */ - private INode resolvePaths(final long zoneId, List<byte[]> startAfters) - throws IOException { - // If the readlock was reacquired, we need to resolve the paths again - // in case things have changed. If our cursor file/dir is changed, - // continue from the next one. - INode zoneNode = dir.getInode(zoneId); - if (zoneNode == null) { - throw new FileNotFoundException("Zone " + zoneId + " is deleted."); - } - INodeDirectory parent = zoneNode.asDirectory(); - for (int i = 0; i < startAfters.size(); ++i) { - if (i == startAfters.size() - 1) { - // last startAfter does not need to be resolved, since search for - // nextChild will cover that automatically. - break; - } - INode curr = - parent.getChild(startAfters.get(i), Snapshot.CURRENT_STATE_ID); - if (curr == null) { - // inode at this level has changed. Update startAfters to point to - // the next dir at the parent level (and dropping any startAfters - // at lower levels). - for (; i < startAfters.size(); ++i) { - startAfters.remove(startAfters.size() - 1); - } - break; - } - parent = curr.asDirectory(); - } - return parent; - } - - /** - * Submit the current batch to the thread pool. - * - * @param zoneId Id of the EZ INode - * @throws IOException - * @throws InterruptedException - */ - private void submitCurrentBatch(final long zoneId) - throws IOException, InterruptedException { - assert dir.hasReadLock(); - if (currentBatch.isEmpty()) { - return; - } - ZoneSubmissionTracker zst; - synchronized (this) { - zst = submissions.get(zoneId); - if (zst == null) { - zst = new ZoneSubmissionTracker(); - submissions.put(zoneId, zst); - } - } - Future future = batchService - .submit(new EDEKReencryptCallable(zoneId, currentBatch, this)); - zst.addTask(future); - LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.", - currentBatch.getFirstFilePath(), currentBatch.size(), zoneId); - currentBatch = new ReencryptionBatch(reencryptBatchSize); - // flip the pause flag if this is nth submission. - // The actual pause need to happen outside of the lock. - if (pauseAfterNthSubmission > 0) { - if (--pauseAfterNthSubmission == 0) { - shouldPauseForTesting = true; - } - } + traverser.traverseDir(parent, zoneId, startAfter, + new ZoneTraverseInfo(zs.getEzKeyVersionName())); } final class ReencryptionBatch { @@ -711,256 +581,270 @@ public class ReencryptionHandler implements Runnable { } } + /** - * Iterates the parent directory, and add direct children files to - * current batch. If batch size meets configured threshold, a Callable - * is created and sent to the thread pool, which will communicate to the KMS - * to get new edeks. - * <p> - * Locks could be released and reacquired when a Callable is created. - * - * @param zoneId Id of the EZ INode - * @return The inode which was just processed, if lock is held in the entire - * process. Null if lock is released. - * @throws IOException - * @throws InterruptedException + * Called when a new zone is submitted for re-encryption. This will interrupt + * the background thread if it's waiting for the next + * DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY. */ - private INode reencryptDirInt(final long zoneId, INode curr, - List<byte[]> startAfters, final String ezKeyVerName) - throws IOException, InterruptedException { - assert dir.hasReadLock(); - assert dir.getFSNamesystem().hasReadLock(); - Preconditions.checkNotNull(curr, "Current inode can't be null"); - checkZoneReady(zoneId); - final INodeDirectory parent = - curr.isDirectory() ? curr.asDirectory() : curr.getParent(); - ReadOnlyList<INode> children = - parent.getChildrenList(Snapshot.CURRENT_STATE_ID); - if (LOG.isDebugEnabled()) { - LOG.debug("Re-encrypting directory {}", parent.getFullPathName()); - } - - final byte[] startAfter = startAfters.get(startAfters.size() - 1); - boolean lockReleased = false; - for (int i = INodeDirectory.nextChild(children, startAfter); - i < children.size(); ++i) { - final INode inode = children.get(i); - if (!reencryptINode(inode, ezKeyVerName)) { - // inode wasn't added for re-encryption. Recurse down if it's a dir, - // skip otherwise. - if (!inode.isDirectory()) { - continue; - } - if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) { - // nested EZ, ignore. - LOG.info("{}({}) is a nested EZ, skipping for re-encryption", - inode.getFullPathName(), inode.getId()); - continue; + synchronized void notifyNewSubmission() { + LOG.debug("Notifying handler for new re-encryption command."); + this.notify(); + } + + public ReencryptionPendingInodeIdCollector getTraverser() { + return traverser; + } + + /** + * ReencryptionPendingInodeIdCollector which throttle based on configured + * throttle ratio. + */ + class ReencryptionPendingInodeIdCollector extends FSTreeTraverser { + + private ReencryptionHandler reencryptionHandler; + + ReencryptionPendingInodeIdCollector(FSDirectory dir, + ReencryptionHandler rHandler) { + super(dir); + this.reencryptionHandler = rHandler; + } + + @Override + protected void checkPauseForTesting() + throws InterruptedException { + assert !dir.hasReadLock(); + assert !dir.getFSNamesystem().hasReadLock(); + while (shouldPauseForTesting) { + LOG.info("Sleeping in the re-encrypt handler for unit test."); + synchronized (reencryptionHandler) { + reencryptionHandler.wait(30000); } - // add 1 level to the depth-first search. - curr = inode; - if (!startAfters.isEmpty()) { - startAfters.remove(startAfters.size() - 1); - startAfters.add(curr.getLocalNameBytes()); + LOG.info("Continuing re-encrypt handler after pausing."); + } + } + + /** + * Process an Inode for re-encryption. Add to current batch if it's a file, + * no-op otherwise. + * + * @param inode + * the inode + * @return true if inode is added to currentBatch and should be + * re-encrypted. false otherwise: could be inode is not a file, or + * inode's edek's key version is not changed. + * @throws IOException + * @throws InterruptedException + */ + @Override + public boolean processFileInode(INode inode, TraverseInfo traverseInfo) + throws IOException, InterruptedException { + assert dir.hasReadLock(); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing {} for re-encryption", inode.getFullPathName()); + } + if (!inode.isFile()) { + return false; + } + FileEncryptionInfo feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo( + dir, INodesInPath.fromINode(inode)); + if (feInfo == null) { + LOG.warn("File {} skipped re-encryption because it is not encrypted! " + + "This is very likely a bug.", inode.getId()); + return false; + } + if (traverseInfo instanceof ZoneTraverseInfo + && ((ZoneTraverseInfo) traverseInfo).getEzKeyVerName().equals( + feInfo.getEzKeyVersionName())) { + if (LOG.isDebugEnabled()) { + LOG.debug("File {} skipped re-encryption because edek's key version" + + " name is not changed.", inode.getFullPathName()); } - startAfters.add(HdfsFileStatus.EMPTY_NAME); - return lockReleased ? null : curr; + return false; + } + currentBatch.add(inode.asFile()); + return true; + } + + /** + * Check whether zone is ready for re-encryption. Throws IOE if it's not. 1. + * If EZ is deleted. 2. if the re-encryption is canceled. 3. If NN is not + * active or is in safe mode. + * + * @throws IOException + * if zone does not exist / is cancelled, or if NN is not ready + * for write. + */ + @Override + protected void checkINodeReady(long zoneId) throws IOException { + final ZoneReencryptionStatus zs = getReencryptionStatus().getZoneStatus( + zoneId); + if (zs == null) { + throw new IOException("Zone " + zoneId + " status cannot be found."); + } + if (zs.isCanceled()) { + throw new IOException("Re-encryption is canceled for zone " + zoneId); } - if (currentBatch.size() >= reencryptBatchSize) { - final byte[] currentStartAfter = inode.getLocalNameBytes(); - final String parentPath = parent.getFullPathName(); - submitCurrentBatch(zoneId); - lockReleased = true; - readUnlock(); - try { - throttle(); - checkPauseForTesting(); - } finally { - readLock(); + dir.getFSNamesystem().checkNameNodeSafeMode( + "NN is in safe mode, cannot re-encrypt."); + // re-encryption should be cancelled when NN goes to standby. Just + // double checking for sanity. + dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE); + } + + /** + * Submit the current batch to the thread pool. + * + * @param zoneId + * Id of the EZ INode + * @throws IOException + * @throws InterruptedException + */ + @Override + protected void submitCurrentBatch(final long zoneId) throws IOException, + InterruptedException { + if (currentBatch.isEmpty()) { + return; + } + ZoneSubmissionTracker zst; + synchronized (ReencryptionHandler.this) { + zst = submissions.get(zoneId); + if (zst == null) { + zst = new ZoneSubmissionTracker(); + submissions.put(zoneId, zst); } - checkZoneReady(zoneId); - - // Things could have changed when the lock was released. - // Re-resolve the parent inode. - FSPermissionChecker pc = dir.getPermissionChecker(); - INode newParent = - dir.resolvePath(pc, parentPath, FSDirectory.DirOp.READ) - .getLastINode(); - if (newParent == null || !newParent.equals(parent)) { - // parent dir is deleted or recreated. We're done. - return null; + } + Future future = batchService.submit(new EDEKReencryptCallable(zoneId, + currentBatch, reencryptionHandler)); + zst.addTask(future); + LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.", + currentBatch.getFirstFilePath(), currentBatch.size(), zoneId); + currentBatch = new ReencryptionBatch(reencryptBatchSize); + // flip the pause flag if this is nth submission. + // The actual pause need to happen outside of the lock. + if (pauseAfterNthSubmission > 0) { + if (--pauseAfterNthSubmission == 0) { + shouldPauseForTesting = true; } - children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID); - // -1 to counter the ++ on the for loop - i = INodeDirectory.nextChild(children, currentStartAfter) - 1; } } - // Successfully finished this dir, adjust pointers to 1 level up, and - // startAfter this dir. - startAfters.remove(startAfters.size() - 1); - if (!startAfters.isEmpty()) { - startAfters.remove(startAfters.size() - 1); - startAfters.add(curr.getLocalNameBytes()); - } - curr = curr.getParent(); - return lockReleased ? null : curr; - } - private void readLock() { - dir.getFSNamesystem().readLock(); - dir.readLock(); - throttleTimerLocked.start(); - } + /** + * Throttles the ReencryptionHandler in 3 aspects: + * 1. Prevents generating more Callables than the CPU could possibly + * handle. + * 2. Prevents generating more Callables than the ReencryptionUpdater + * can handle, under its own throttling. + * 3. Prevents contending FSN/FSD read locks. This is done based + * on the DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration. + * <p> + * Item 1 and 2 are to control NN heap usage. + * + * @throws InterruptedException + */ + @VisibleForTesting + @Override + protected void throttle() throws InterruptedException { + assert !dir.hasReadLock(); + assert !dir.getFSNamesystem().hasReadLock(); + final int numCores = Runtime.getRuntime().availableProcessors(); + if (taskQueue.size() >= numCores) { + LOG.debug("Re-encryption handler throttling because queue size {} is" + + "larger than number of cores {}", taskQueue.size(), numCores); + while (taskQueue.size() >= numCores) { + Thread.sleep(100); + } + } - private void readUnlock() { - dir.readUnlock(); - dir.getFSNamesystem().readUnlock("reencryptHandler"); - throttleTimerLocked.stop(); - } + // 2. if tasks are piling up on the updater, don't create new callables + // until the queue size goes down. + final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2; + int numTasks = numTasksSubmitted(); + if (numTasks >= maxTasksPiled) { + LOG.debug("Re-encryption handler throttling because total tasks pending" + + " re-encryption updater is {}", numTasks); + while (numTasks >= maxTasksPiled) { + Thread.sleep(500); + numTasks = numTasksSubmitted(); + } + } - /** - * Throttles the ReencryptionHandler in 3 aspects: - * 1. Prevents generating more Callables than the CPU could possibly handle. - * 2. Prevents generating more Callables than the ReencryptionUpdater can - * handle, under its own throttling - * 3. Prevents contending FSN/FSD read locks. This is done based on the - * DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration. - * <p> - * Item 1 and 2 are to control NN heap usage. - * - * @throws InterruptedException - */ - @VisibleForTesting - void throttle() throws InterruptedException { - // 1. - final int numCores = Runtime.getRuntime().availableProcessors(); - if (taskQueue.size() >= numCores) { - LOG.debug("Re-encryption handler throttling because queue size {} is" - + "larger than number of cores {}", taskQueue.size(), numCores); - while (taskQueue.size() >= numCores) { - Thread.sleep(100); + // 3. + if (throttleLimitHandlerRatio >= 1.0) { + return; + } + final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS) + * throttleLimitHandlerRatio); + final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS); + if (LOG.isDebugEnabled()) { + LOG.debug("Re-encryption handler throttling expect: {}, actual: {}," + + " throttleTimerAll:{}", expect, actual, + throttleTimerAll.now(TimeUnit.MILLISECONDS)); } + if (expect - actual < 0) { + // in case throttleLimitHandlerRatio is very small, expect will be 0. + // so sleepMs should not be calculated from expect, to really meet the + // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs + // should be 1000 - throttleTimerAll.now() + final long sleepMs = (long) (actual / throttleLimitHandlerRatio) + - throttleTimerAll.now(TimeUnit.MILLISECONDS); + LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs); + Thread.sleep(sleepMs); + } + throttleTimerAll.reset().start(); + throttleTimerLocked.reset(); } - // 2. if tasks are piling up on the updater, don't create new callables - // until the queue size goes down. - final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2; - int numTasks = numTasksSubmitted(); - if (numTasks >= maxTasksPiled) { - LOG.debug("Re-encryption handler throttling because total tasks pending" - + " re-encryption updater is {}", numTasks); - while (numTasks >= maxTasksPiled) { - Thread.sleep(500); - numTasks = numTasksSubmitted(); + private int numTasksSubmitted() { + int ret = 0; + synchronized (ReencryptionHandler.this) { + for (ZoneSubmissionTracker zst : submissions.values()) { + ret += zst.getTasks().size(); + } } + return ret; } - // 3. - if (throttleLimitHandlerRatio >= 1.0) { - return; - } - final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS) - * throttleLimitHandlerRatio); - final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS); - if (LOG.isDebugEnabled()) { - LOG.debug("Re-encryption handler throttling expect: {}, actual: {}," - + " throttleTimerAll:{}", expect, actual, - throttleTimerAll.now(TimeUnit.MILLISECONDS)); - } - if (expect - actual < 0) { - // in case throttleLimitHandlerRatio is very small, expect will be 0. - // so sleepMs should not be calculated from expect, to really meet the - // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs - // should be 1000 - throttleTimerAll.now() - final long sleepMs = - (long) (actual / throttleLimitHandlerRatio) - throttleTimerAll - .now(TimeUnit.MILLISECONDS); - LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs); - Thread.sleep(sleepMs); + @Override + public boolean canSubmitCurrentBatch() { + return currentBatch.size() >= reencryptBatchSize; } - throttleTimerAll.reset().start(); - throttleTimerLocked.reset(); - } - private synchronized int numTasksSubmitted() { - int ret = 0; - for (ZoneSubmissionTracker zst : submissions.values()) { - ret += zst.getTasks().size(); + @Override + public boolean canTraverseDir(INode inode) throws IOException { + if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) { + // nested EZ, ignore. + LOG.info("{}({}) is a nested EZ, skipping for re-encryption", + inode.getFullPathName(), inode.getId()); + return false; + } + return true; } - return ret; - } - /** - * Process an Inode for re-encryption. Add to current batch if it's a file, - * no-op otherwise. - * - * @param inode the inode - * @return true if inode is added to currentBatch and should be re-encrypted. - * false otherwise: could be inode is not a file, or inode's edek's - * key version is not changed. - * @throws IOException - * @throws InterruptedException - */ - private boolean reencryptINode(final INode inode, final String ezKeyVerName) - throws IOException, InterruptedException { - assert dir.hasReadLock(); - if (LOG.isTraceEnabled()) { - LOG.trace("Processing {} for re-encryption", inode.getFullPathName()); - } - if (!inode.isFile()) { - return false; - } - FileEncryptionInfo feInfo = FSDirEncryptionZoneOp - .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode)); - if (feInfo == null) { - LOG.warn("File {} skipped re-encryption because it is not encrypted! " - + "This is very likely a bug.", inode.getId()); - return false; + @Override + protected void readLock() { + dir.getFSNamesystem().readLock(); + dir.readLock(); + throttleTimerLocked.start(); } - if (ezKeyVerName.equals(feInfo.getEzKeyVersionName())) { - if (LOG.isDebugEnabled()) { - LOG.debug("File {} skipped re-encryption because edek's key version" - + " name is not changed.", inode.getFullPathName()); - } - return false; + + @Override + protected void readUnlock() { + dir.readUnlock(); + dir.getFSNamesystem().readUnlock("reencryptHandler"); + throttleTimerLocked.stop(); } - currentBatch.add(inode.asFile()); - return true; } - /** - * Check whether zone is ready for re-encryption. Throws IOE if it's not. - * 1. If EZ is deleted. - * 2. if the re-encryption is canceled. - * 3. If NN is not active or is in safe mode. - * - * @throws IOException if zone does not exist / is cancelled, or if NN is not - * ready for write. - */ - void checkZoneReady(final long zoneId) - throws RetriableException, SafeModeException, IOException { - final ZoneReencryptionStatus zs = - getReencryptionStatus().getZoneStatus(zoneId); - if (zs == null) { - throw new IOException("Zone " + zoneId + " status cannot be found."); - } - if (zs.isCanceled()) { - throw new IOException("Re-encryption is canceled for zone " + zoneId); + class ZoneTraverseInfo extends TraverseInfo { + private String ezKeyVerName; + + ZoneTraverseInfo(String ezKeyVerName) { + this.ezKeyVerName = ezKeyVerName; } - dir.getFSNamesystem() - .checkNameNodeSafeMode("NN is in safe mode, cannot re-encrypt."); - // re-encryption should be cancelled when NN goes to standby. Just - // double checking for sanity. - dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE); - } - /** - * Called when a new zone is submitted for re-encryption. This will interrupt - * the background thread if it's waiting for the next - * DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY. - */ - synchronized void notifyNewSubmission() { - LOG.debug("Notifying handler for new re-encryption command."); - this.notify(); + public String getEzKeyVerName() { + return ezKeyVerName; + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java index 3b7badb..a5923a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java @@ -464,7 +464,7 @@ public final class ReencryptionUpdater implements Runnable { final String zonePath; dir.writeLock(); try { - handler.checkZoneReady(task.zoneId); + handler.getTraverser().checkINodeReady(task.zoneId); final INode zoneNode = dir.getInode(task.zoneId); if (zoneNode == null) { // ez removed. http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index 48d0598..a4372d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -77,7 +77,8 @@ public class StoragePolicySatisfier implements Runnable { private final BlockStorageMovementNeeded storageMovementNeeded; private final BlockStorageMovementAttemptedItems storageMovementsMonitor; private volatile boolean isRunning = false; - + private int spsWorkMultiplier; + private long blockCount = 0L; /** * Represents the collective analysis status for all blocks. */ @@ -106,7 +107,9 @@ public class StoragePolicySatisfier implements Runnable { final BlockManager blkManager, Configuration conf) { this.namesystem = namesystem; this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem, - this); + this, conf.getInt( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT)); this.blockManager = blkManager; this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems( conf.getLong( @@ -117,6 +120,7 @@ public class StoragePolicySatisfier implements Runnable { DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT), storageMovementNeeded, this); + this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf); } /** @@ -143,7 +147,7 @@ public class StoragePolicySatisfier implements Runnable { // Ensure that all the previously submitted block movements(if any) have to // be stopped in all datanodes. addDropSPSWorkCommandsToAllDNs(); - storageMovementNeeded.start(); + storageMovementNeeded.init(); storagePolicySatisfierThread = new Daemon(this); storagePolicySatisfierThread.setName("StoragePolicySatisfier"); storagePolicySatisfierThread.start(); @@ -164,7 +168,7 @@ public class StoragePolicySatisfier implements Runnable { return; } - storageMovementNeeded.stop(); + storageMovementNeeded.close(); storagePolicySatisfierThread.interrupt(); this.storageMovementsMonitor.stop(); @@ -268,9 +272,13 @@ public class StoragePolicySatisfier implements Runnable { } } } - // TODO: We can think to make this as configurable later, how frequently - // we want to check block movements. - Thread.sleep(3000); + int numLiveDn = namesystem.getFSDirectory().getBlockManager() + .getDatanodeManager().getNumLiveDataNodes(); + if (storageMovementNeeded.size() == 0 + || blockCount > (numLiveDn * spsWorkMultiplier)) { + Thread.sleep(3000); + blockCount = 0L; + } } catch (Throwable t) { handleException(t); } @@ -380,6 +388,11 @@ public class StoragePolicySatisfier implements Runnable { assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(), blockMovingInfos, coordinatorNode); + int count = 0; + for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { + count = count + blkMovingInfo.getSources().length; + } + blockCount = blockCount + count; return status; } @@ -840,7 +853,7 @@ public class StoragePolicySatisfier implements Runnable { * - file inode/blockcollection id. */ public void satisfyStoragePolicy(Long inodeId) { - //For file rootId and trackId is same + //For file startId and trackId is same storageMovementNeeded.add(new ItemInfo(inodeId, inodeId)); if (LOG.isDebugEnabled()) { LOG.debug("Added track info for inode {} to block " @@ -864,19 +877,19 @@ public class StoragePolicySatisfier implements Runnable { * policy. */ public static class ItemInfo { - private long rootId; + private long startId; private long trackId; - public ItemInfo(long rootId, long trackId) { - this.rootId = rootId; + public ItemInfo(long startId, long trackId) { + this.startId = startId; this.trackId = trackId; } /** - * Return the root of the current track Id. + * Return the start inode id of the current track Id. */ - public long getRootId() { - return rootId; + public long getStartId() { + return startId; } /** @@ -890,7 +903,7 @@ public class StoragePolicySatisfier implements Runnable { * Returns true if the tracking path is a directory, false otherwise. */ public boolean isDir() { - return (rootId != trackId); + return (startId != trackId); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 48799ea..0298997 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4367,6 +4367,29 @@ </property> <property> + <name>dfs.storage.policy.satisfier.queue.limit</name> + <value>1000</value> + <description> + Storage policy satisfier queue size. This queue contains the currently + scheduled file's inode ID for statisfy the policy. + Default value is 1000. + </description> +</property> + +<property> + <name>dfs.storage.policy.satisfier.work.multiplier.per.iteration</name> + <value>1</value> + <description> + *Note*: Advanced property. Change with caution. + This determines the total amount of block transfers to begin in + one iteration, for satisfy the policy. The actual number is obtained by + multiplying this multiplier with the total number of live nodes in the + cluster. The result number is the number of blocks to begin transfers + immediately. This number can be any positive, non-zero integer. + </description> +</property> + +<property> <name>dfs.storage.policy.satisfier.recheck.timeout.millis</name> <value>300000</value> <description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md index 87817cf..da61842 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md @@ -110,7 +110,7 @@ SPS can be enabled and disabled dynamically without restarting the Namenode. Detailed design documentation can be found at [Storage Policy Satisfier(SPS) (HDFS-10285)](https://issues.apache.org/jira/browse/HDFS-10285) -* **Note**: When user invokes `satisfyStoragePolicy()` API on a directory, SPS will consider the files which are immediate to that directory. Sub-directories won't be considered for satisfying the policy. Its user responsibility to call this API on directories recursively, to track all files under the sub tree. +* **Note**: When user invokes `satisfyStoragePolicy()` API on a directory, SPS will scan all sub-directories and consider all the files for satisfy the policy.. * HdfsAdmin API : `public void satisfyStoragePolicy(final Path path) throws IOException` @@ -212,7 +212,6 @@ Get the storage policy of a file or a directory. ### Satisfy Storage Policy Schedule blocks to move based on file's/directory's current storage policy. -Note: For directory case, it will consider immediate files under that directory and it will not consider sub directories recursively. * Command: http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java index 55ebf9c..7918821 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java @@ -41,7 +41,7 @@ public class TestBlockStorageMovementAttemptedItems { public void setup() throws Exception { unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded( Mockito.mock(Namesystem.class), - Mockito.mock(StoragePolicySatisfier.class)); + Mockito.mock(StoragePolicySatisfier.class), 100); StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class); bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, selfRetryTimeout, unsatisfiedStorageMovementFiles, sps); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java index e7b9148..5bce296 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java @@ -191,7 +191,7 @@ public class TestPersistentStoragePolicySatisfier { DFSTestUtil.waitExpectedStorageType( parentFileName, StorageType.ARCHIVE, 3, timeout, fs); DFSTestUtil.waitExpectedStorageType( - childFileName, StorageType.DEFAULT, 3, timeout, fs); + childFileName, StorageType.ARCHIVE, 3, timeout, fs); } finally { clusterShutdown(); @@ -232,7 +232,9 @@ public class TestPersistentStoragePolicySatisfier { DFSTestUtil.waitExpectedStorageType( parentFileName, StorageType.ARCHIVE, 2, timeout, fs); DFSTestUtil.waitExpectedStorageType( - childFileName, StorageType.DEFAULT, 3, timeout, fs); + childFileName, StorageType.DISK, 1, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + childFileName, StorageType.ARCHIVE, 2, timeout, fs); } finally { clusterShutdown(); } @@ -269,7 +271,7 @@ public class TestPersistentStoragePolicySatisfier { DFSTestUtil.waitExpectedStorageType( parentFileName, StorageType.ARCHIVE, 3, timeout, fs); DFSTestUtil.waitExpectedStorageType( - childFileName, StorageType.DEFAULT, 3, timeout, fs); + childFileName, StorageType.ARCHIVE, 3, timeout, fs); } finally { clusterShutdown(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java index f0d6834..7685f31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java @@ -32,7 +32,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Supplier; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -64,7 +63,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; - import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -72,7 +70,6 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import org.junit.rules.Timeout; import org.mockito.internal.util.reflection.Whitebox; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java index e2035ed..3481b42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java @@ -75,6 +75,10 @@ public class TestReencryptionHandler { CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH); Mockito.when(ezm.getProvider()).thenReturn( KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp)); + FSDirectory fsd = Mockito.mock(FSDirectory.class); + FSNamesystem fns = Mockito.mock(FSNamesystem.class); + Mockito.when(fsd.getFSNamesystem()).thenReturn(fns); + Mockito.when(ezm.getFSDirectory()).thenReturn(fsd); return new ReencryptionHandler(ezm, conf); } @@ -99,7 +103,7 @@ public class TestReencryptionHandler { Whitebox.setInternalState(rh, "throttleTimerLocked", mockLocked); Whitebox.setInternalState(rh, "taskQueue", queue); final StopWatch sw = new StopWatch().start(); - rh.throttle(); + rh.getTraverser().throttle(); sw.stop(); assertTrue("should have throttled for at least 8 second", sw.now(TimeUnit.MILLISECONDS) > 8000); @@ -130,7 +134,7 @@ public class TestReencryptionHandler { submissions = new HashMap<>(); Whitebox.setInternalState(rh, "submissions", submissions); StopWatch sw = new StopWatch().start(); - rh.throttle(); + rh.getTraverser().throttle(); sw.stop(); assertTrue("should not have throttled", sw.now(TimeUnit.MILLISECONDS) < 1000); @@ -189,7 +193,7 @@ public class TestReencryptionHandler { Whitebox.setInternalState(rh, "submissions", submissions); final StopWatch sw = new StopWatch().start(); removeTaskThread.start(); - rh.throttle(); + rh.getTraverser().throttle(); sw.stop(); LOG.info("Throttle completed, consumed {}", sw.now(TimeUnit.MILLISECONDS)); assertTrue("should have throttled for at least 3 second", --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
