HDFS-12982 : [SPS]: Reduce the locking and cleanup the Namesystem access. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/113185ee Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/113185ee Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/113185ee Branch: refs/heads/HDFS-10285 Commit: 113185eec03d6d107ac19d267741c760aaef8f9c Parents: e36384a Author: Surendra Singh Lilhore <surendralilh...@apache.org> Authored: Mon Jan 8 15:13:11 2018 +0530 Committer: Rakesh Radhakrishnan <rake...@apache.org> Committed: Tue Jul 31 12:10:10 2018 +0530 ---------------------------------------------------------------------- .../server/blockmanagement/BlockManager.java | 16 +- .../blockmanagement/DatanodeDescriptor.java | 2 +- .../server/blockmanagement/DatanodeManager.java | 22 ++ .../server/namenode/FSDirStatAndListingOp.java | 1 + .../hdfs/server/namenode/FSNamesystem.java | 44 ++- .../hdfs/server/namenode/IntraNNSPSContext.java | 41 -- .../hadoop/hdfs/server/namenode/Namesystem.java | 24 ++ .../sps/BlockStorageMovementAttemptedItems.java | 17 +- .../sps/BlockStorageMovementNeeded.java | 48 ++- .../hdfs/server/namenode/sps/Context.java | 181 +++++++++ .../namenode/sps/IntraSPSNameNodeContext.java | 220 +++++++++++ .../namenode/sps/StoragePolicySatisfier.java | 374 +++++++++---------- .../TestBlockStorageMovementAttemptedItems.java | 17 +- .../sps/TestStoragePolicySatisfier.java | 25 +- 14 files changed, 742 insertions(+), 290 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 1cf687e..c2d5162 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -89,11 +89,12 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.INodesInPath; -import org.apache.hadoop.hdfs.server.namenode.IntraNNSPSContext; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; +import org.apache.hadoop.hdfs.server.namenode.sps.Context; +import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; @@ -433,6 +434,7 @@ public class BlockManager implements BlockStatsMXBean { private final StoragePolicySatisfier sps; private final boolean storagePolicyEnabled; private boolean spsEnabled; + private Context spsctxt = null; /** Minimum live replicas needed for the datanode to be transitioned * from ENTERING_MAINTENANCE to IN_MAINTENANCE. */ @@ -479,8 +481,8 @@ public class BlockManager implements BlockStatsMXBean { conf.getBoolean( DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT); - StoragePolicySatisfier.Context spsctxt = new IntraNNSPSContext(namesystem); - sps = new StoragePolicySatisfier(namesystem, this, conf, spsctxt); + spsctxt = new IntraSPSNameNodeContext(namesystem, this, conf); + sps = new StoragePolicySatisfier(spsctxt); blockTokenSecretManager = createBlockTokenSecretManager(conf); providedStorageMap = new ProvidedStorageMap(namesystem, this, conf); @@ -5031,8 +5033,8 @@ public class BlockManager implements BlockStatsMXBean { LOG.info("Storage policy satisfier is already running."); return; } - - sps.start(false); + // TODO: FSDirectory will get removed via HDFS-12911 modularization work + sps.start(false, namesystem.getFSDirectory()); } /** @@ -5068,8 +5070,8 @@ public class BlockManager implements BlockStatsMXBean { LOG.info("Storage policy satisfier is already running."); return; } - - sps.start(true); + // TODO: FSDirectory will get removed via HDFS-12911 modularization work + sps.start(true, namesystem.getFSDirectory()); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index f9a76b4..b09d908 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -802,7 +802,7 @@ public class DatanodeDescriptor extends DatanodeInfo { } /** Increment the number of blocks scheduled. */ - void incrementBlocksScheduled(StorageType t) { + public void incrementBlocksScheduled(StorageType t) { currApproxBlocksScheduled.add(t, 1); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 6aab5e9..c24a38b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.net.DFSNetworkTopology; import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; @@ -2045,5 +2046,26 @@ public class DatanodeManager { } } } + + /** + * Generates datanode reports for the given report type. + * + * @param type + * type of the datanode report + * @return array of DatanodeStorageReports + */ + public DatanodeStorageReport[] getDatanodeStorageReport( + DatanodeReportType type) { + final List<DatanodeDescriptor> datanodes = getDatanodeListForReport(type); + + DatanodeStorageReport[] reports = new DatanodeStorageReport[datanodes + .size()]; + for (int i = 0; i < reports.length; i++) { + final DatanodeDescriptor d = datanodes.get(i); + reports[i] = new DatanodeStorageReport( + new DatanodeInfoBuilder().setFrom(d).build(), d.getStorageReports()); + } + return reports; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java index 7e22ae1..709e270 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java @@ -90,6 +90,7 @@ class FSDirStatAndListingOp { * @param srcArg The string representation of the path to the file * @param resolveLink whether to throw UnresolvedLinkException * if src refers to a symlink + * @param needLocation if blockLocations need to be returned * * @param needLocation Include {@link LocatedBlocks} in result. * @param needBlockToken Include block tokens in {@link LocatedBlocks}. http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 3f6e4b3..25a45c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3133,6 +3133,29 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * @param src The string representation of the path to the file * @param resolveLink whether to throw UnresolvedLinkException * if src refers to a symlink + * @param needLocation if blockLocations need to be returned + * + * @throws AccessControlException + * if access is denied + * @throws UnresolvedLinkException + * if a symlink is encountered. + * + * @return object containing information regarding the file or null if file + * not found + * @throws StandbyException + */ + @Override + public HdfsFileStatus getFileInfo(final String src, boolean resolveLink, + boolean needLocation) throws IOException { + return getFileInfo(src, resolveLink, needLocation, false); + } + + /** + * Get the file info for a specific file. + * + * @param src The string representation of the path to the file + * @param resolveLink whether to throw UnresolvedLinkException + * if src refers to a symlink * * @param needLocation Include {@link LocatedBlocks} in result. * @param needBlockToken Include block tokens in {@link LocatedBlocks} @@ -3167,6 +3190,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return stat; } + @Override + public String getFilePath(Long inodeId) { + readLock(); + try { + INode inode = getFSDirectory().getInode(inodeId); + return inode == null ? null : inode.getFullPathName(); + } finally { + readUnlock(); + } + } + /** * Returns true if the file is closed */ @@ -4459,15 +4493,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, try { checkOperation(OperationCategory.UNCHECKED); final DatanodeManager dm = getBlockManager().getDatanodeManager(); - final List<DatanodeDescriptor> datanodes = dm.getDatanodeListForReport(type); - - reports = new DatanodeStorageReport[datanodes.size()]; - for (int i = 0; i < reports.length; i++) { - final DatanodeDescriptor d = datanodes.get(i); - reports[i] = new DatanodeStorageReport( - new DatanodeInfoBuilder().setFrom(d).build(), - d.getStorageReports()); - } + reports = dm.getDatanodeStorageReport(type); } finally { readUnlock("getDatanodeStorageReport"); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java deleted file mode 100644 index 111cabb..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; - -/** - * This class is the Namenode implementation for analyzing the file blocks which - * are expecting to change its storages and assigning the block storage - * movements to satisfy the storage policy. - */ -// TODO: Now, added one API which is required for sps package. Will refine -// this interface via HDFS-12911. -public class IntraNNSPSContext implements StoragePolicySatisfier.Context { - private final Namesystem namesystem; - - public IntraNNSPSContext(Namesystem namesystem) { - this.namesystem = namesystem; - } - - @Override - public int getNumLiveDataNodes() { - return namesystem.getFSDirectory().getBlockManager().getDatanodeManager() - .getNumLiveDataNodes(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java index e58fa72..fc933b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.util.RwLock; @@ -62,4 +63,27 @@ public interface Namesystem extends RwLock, SafeMode { * @throws IOException */ void removeXattr(long id, String xattrName) throws IOException; + + /** + * Gets the fileInfo of the given file path. + * + * @param filePath string representation of the path to the file + * @param resolveLink whether to throw UnresolvedLinkException + * if src refers to a symlink + * @param needLocation if blockLocations need to be returned + * + * @return hdfs file status details + * @throws IOException + */ + HdfsFileStatus getFileInfo(String filePath, boolean resolveLink, + boolean needLocation) throws IOException; + + /** + * Gets the file path corresponds to the given file id. + * + * @param inodeId + * file id + * @return string file path + */ + String getFilePath(Long inodeId); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java index b044f30..1cae027 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java @@ -25,6 +25,11 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY; + import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo; @@ -66,15 +71,21 @@ public class BlockStorageMovementAttemptedItems { // private long minCheckTimeout = 1 * 60 * 1000; // minimum value private BlockStorageMovementNeeded blockStorageMovementNeeded; + private final Context ctxt; - public BlockStorageMovementAttemptedItems(long recheckTimeout, - long selfRetryTimeout, + public BlockStorageMovementAttemptedItems(Context context, BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) { + this.ctxt = context; + long recheckTimeout = ctxt.getConf().getLong( + DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, + DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT); if (recheckTimeout > 0) { this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout); } - this.selfRetryTimeout = selfRetryTimeout; + this.selfRetryTimeout = ctxt.getConf().getLong( + DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY, + DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT); this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles; storageMovementAttemptedItems = new ArrayList<>(); movementFinishedBlocks = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java index 5635621..80f1893 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java @@ -17,7 +17,8 @@ */ package org.apache.hadoop.hdfs.server.namenode.sps; -import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY; import java.io.IOException; import java.util.ArrayList; @@ -35,10 +36,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathSta import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.namenode.FSDirectory; import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser; +import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo; import org.apache.hadoop.hdfs.server.namenode.INode; -import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo; -import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Time; import org.slf4j.Logger; @@ -73,13 +73,11 @@ public class BlockStorageMovementNeeded { private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus = new ConcurrentHashMap<>(); - private final Namesystem namesystem; + private final Context ctxt; // List of pending dir to satisfy the policy private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>(); - private final StoragePolicySatisfier sps; - private Daemon inodeIdCollector; private final int maxQueuedItem; @@ -88,11 +86,11 @@ public class BlockStorageMovementNeeded { // NOT_AVAILABLE. private static long statusClearanceElapsedTimeMs = 300000; - public BlockStorageMovementNeeded(Namesystem namesystem, - StoragePolicySatisfier sps, int queueLimit) { - this.namesystem = namesystem; - this.sps = sps; - this.maxQueuedItem = queueLimit; + public BlockStorageMovementNeeded(Context context) { + this.ctxt = context; + this.maxQueuedItem = ctxt.getConf().getInt( + DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, + DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT); } /** @@ -188,8 +186,7 @@ public class BlockStorageMovementNeeded { // If track is part of some start inode then reduce the pending // directory work count. long startId = trackInfo.getStartId(); - INode inode = namesystem.getFSDirectory().getInode(startId); - if (inode == null) { + if (!ctxt.isFileExist(startId)) { // directory deleted just remove it. this.pendingWorkForDirectory.remove(startId); updateStatus(startId, isSuccess); @@ -198,7 +195,7 @@ public class BlockStorageMovementNeeded { if (pendingWork != null) { pendingWork.decrementPendingWorkCount(); if (pendingWork.isDirWorkDone()) { - namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY); + ctxt.removeSPSHint(startId); pendingWorkForDirectory.remove(startId); pendingWork.setFailure(!isSuccess); updateStatus(startId, pendingWork.isPolicySatisfied()); @@ -209,8 +206,7 @@ public class BlockStorageMovementNeeded { } else { // Remove xAttr if trackID doesn't exist in // storageMovementAttemptedItems or file policy satisfied. - namesystem.removeXattr(trackInfo.getTrackId(), - XATTR_SATISFY_STORAGE_POLICY); + ctxt.removeSPSHint(trackInfo.getTrackId()); updateStatus(trackInfo.getStartId(), isSuccess); } } @@ -256,7 +252,7 @@ public class BlockStorageMovementNeeded { while ((trackId = spsDirsToBeTraveresed.poll()) != null) { try { // Remove xAttr for file - namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY); + ctxt.removeSPSHint(trackId); } catch (IOException ie) { LOG.warn("Failed to remove SPS xattr for track id " + trackId, ie); } @@ -269,8 +265,7 @@ public class BlockStorageMovementNeeded { try { // Remove xAttr for file if (!itemInfo.isDir()) { - namesystem.removeXattr(itemInfo.getTrackId(), - XATTR_SATISFY_STORAGE_POLICY); + ctxt.removeSPSHint(itemInfo.getTrackId()); } } catch (IOException ie) { LOG.warn( @@ -300,10 +295,9 @@ public class BlockStorageMovementNeeded { public void run() { LOG.info("Starting FileInodeIdCollector!."); long lastStatusCleanTime = 0; - while (namesystem.isRunning() && sps.isRunning()) { + while (ctxt.isRunning()) { try { - if (!namesystem.isInSafeMode()) { - FSDirectory fsd = namesystem.getFSDirectory(); + if (!ctxt.isInSafeMode()) { Long startINodeId = spsDirsToBeTraveresed.poll(); if (startINodeId == null) { // Waiting for SPS path @@ -311,7 +305,7 @@ public class BlockStorageMovementNeeded { spsDirsToBeTraveresed.wait(5000); } } else { - INode startInode = fsd.getInode(startINodeId); + INode startInode = getFSDirectory().getInode(startINodeId); if (startInode != null) { try { remainingCapacity = remainingCapacity(); @@ -333,8 +327,7 @@ public class BlockStorageMovementNeeded { DirPendingWorkInfo dirPendingWorkInfo = pendingWorkForDirectory.get(startInode.getId()); if (dirPendingWorkInfo.isDirWorkDone()) { - namesystem.removeXattr(startInode.getId(), - XATTR_SATISFY_STORAGE_POLICY); + ctxt.removeSPSHint(startInode.getId()); pendingWorkForDirectory.remove(startInode.getId()); updateStatus(startInode.getId(), true); } @@ -483,9 +476,10 @@ public class BlockStorageMovementNeeded { } } - public void init() { + // TODO: FSDirectory will get removed via HDFS-12911 modularization work + public void init(FSDirectory fsd) { inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector( - namesystem.getFSDirectory())); + fsd)); inodeIdCollector.setName("FileInodeIdCollector"); inodeIdCollector.start(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java new file mode 100644 index 0000000..d11e26f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode.sps; + +import java.io.IOException; +import java.util.function.Supplier; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.security.AccessControlException; + +/** + * An interface for the communication between NameNode and SPS module. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface Context { + + /** + * Returns configuration object. + */ + Configuration getConf(); + + /** + * Returns true if the SPS is running, false otherwise. + */ + boolean isRunning(); + + /** + * Update the SPS running status. + * + * @param isSpsRunning + * true represents running, false otherwise + */ + void setSPSRunning(Supplier<Boolean> isSpsRunning); + + /** + * Returns true if the Namenode in safe mode, false otherwise. + */ + boolean isInSafeMode(); + + /** + * Returns true if Mover tool is already running, false otherwise. + */ + boolean isMoverRunning(); + + /** + * Gets the Inode ID number for the given path. + * + * @param path + * - file/dir path + * @return Inode id number + */ + long getFileID(String path) throws UnresolvedLinkException, + AccessControlException, ParentNotDirectoryException; + + /** + * Gets the network topology. + * + * @return network topology + */ + NetworkTopology getNetworkTopology(); + + /** + * Returns true if the give Inode exists in the Namespace. + * + * @param inodeId + * - Inode ID + * @return true if Inode exists, false otherwise. + */ + boolean isFileExist(long inodeId); + + /** + * Gets the storage policy details for the given policy ID. + * + * @param policyId + * - Storage policy ID + * @return the detailed policy object + */ + BlockStoragePolicy getStoragePolicy(byte policyId); + + /** + * Drop the SPS work in case if any previous work queued up. + */ + void addDropPreviousSPSWorkAtDNs(); + + /** + * Remove the hint which was added to track SPS call. + * + * @param inodeId + * - Inode ID + * @throws IOException + */ + void removeSPSHint(long inodeId) throws IOException; + + /** + * Gets the number of live datanodes in the cluster. + * + * @return number of live datanodes + */ + int getNumLiveDataNodes(); + + /** + * Get the file info for a specific file. + * + * @param inodeID + * inode identifier + * @return file status metadata information + */ + HdfsFileStatus getFileInfo(long inodeID) throws IOException; + + /** + * Returns all the live datanodes and its storage details. + * + * @throws IOException + */ + DatanodeStorageReport[] getLiveDatanodeStorageReport() + throws IOException; + + /** + * Returns true if the given inode file has low redundancy blocks. + * + * @param inodeID + * inode identifier + * @return true if block collection has low redundancy blocks + */ + boolean hasLowRedundancyBlocks(long inodeID); + + /** + * Assign the given block movement task to the target node present in + * {@link BlockMovingInfo}. + * + * @param blkMovingInfo + * block to storage info + * @throws IOException + */ + void assignBlockMoveTaskToTargetNode(BlockMovingInfo blkMovingInfo) + throws IOException; + + /** + * Checks whether the given datanode has sufficient space to occupy the given + * blockSize data. + * + * @param dn + * datanode info + * @param type + * storage type + * @param blockSize + * blockSize to be scheduled + * @return true if the given datanode has sufficient space to occupy blockSize + * data, false otherwise. + */ + boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn, + StorageType type, long blockSize); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java new file mode 100644 index 0000000..6654212 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.sps; + +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; + +import java.io.IOException; +import java.util.function.Supplier; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.security.AccessControlException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is the Namenode implementation for analyzing the file blocks which + * are expecting to change its storages and assigning the block storage + * movements to satisfy the storage policy. + */ +public class IntraSPSNameNodeContext implements Context { + private static final Logger LOG = LoggerFactory + .getLogger(IntraSPSNameNodeContext.class); + + private final Namesystem namesystem; + private final BlockManager blockManager; + private final Configuration conf; + private Supplier<Boolean> isSpsRunning; + + public IntraSPSNameNodeContext(Namesystem namesystem, + BlockManager blockManager, Configuration conf) { + this.namesystem = namesystem; + this.blockManager = blockManager; + this.conf = conf; + isSpsRunning = () -> false; + } + + @Override + public int getNumLiveDataNodes() { + return blockManager.getDatanodeManager().getNumLiveDataNodes(); + } + + @Override + public HdfsFileStatus getFileInfo(long inodeID) throws IOException { + String filePath = namesystem.getFilePath(inodeID); + if (StringUtils.isBlank(filePath)) { + LOG.debug("File with inodeID:{} doesn't exists!", inodeID); + return null; + } + HdfsFileStatus fileInfo = null; + try { + fileInfo = namesystem.getFileInfo(filePath, true, true); + } catch (IOException e) { + LOG.debug("File path:{} doesn't exists!", filePath); + } + return fileInfo; + } + + @Override + public DatanodeStorageReport[] getLiveDatanodeStorageReport() + throws IOException { + namesystem.readLock(); + try { + return blockManager.getDatanodeManager() + .getDatanodeStorageReport(DatanodeReportType.LIVE); + } finally { + namesystem.readUnlock(); + } + } + + @Override + public boolean hasLowRedundancyBlocks(long inodeID) { + namesystem.readLock(); + try { + BlockCollection bc = namesystem.getBlockCollection(inodeID); + return blockManager.hasLowRedundancyBlocks(bc); + } finally { + namesystem.readUnlock(); + } + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public boolean isFileExist(long inodeId) { + return namesystem.getFSDirectory().getInode(inodeId) != null; + } + + @Override + public void removeSPSHint(long inodeId) throws IOException { + this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY); + } + + @Override + public boolean isRunning() { + // TODO : 'isSpsRunning' flag has been added to avoid the NN lock inside + // SPS. Context interface will be further refined as part of HDFS-12911 + // modularization task. One idea is to introduce a cleaner interface similar + // to Namesystem for better abstraction. + return namesystem.isRunning() && isSpsRunning.get(); + } + + @Override + public void setSPSRunning(Supplier<Boolean> spsRunningFlag) { + this.isSpsRunning = spsRunningFlag; + } + + @Override + public boolean isInSafeMode() { + return namesystem.isInSafeMode(); + } + + @Override + public boolean isMoverRunning() { + String moverId = HdfsServerConstants.MOVER_ID_PATH.toString(); + return namesystem.isFileOpenedForWrite(moverId); + } + + @Override + public void addDropPreviousSPSWorkAtDNs() { + namesystem.readLock(); + try { + blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs(); + } finally { + namesystem.readUnlock(); + } + } + + @Override + public BlockStoragePolicy getStoragePolicy(byte policyID) { + return blockManager.getStoragePolicy(policyID); + } + + @Override + public NetworkTopology getNetworkTopology() { + return blockManager.getDatanodeManager().getNetworkTopology(); + } + + @Override + public long getFileID(String path) throws UnresolvedLinkException, + AccessControlException, ParentNotDirectoryException { + namesystem.readLock(); + try { + INode inode = namesystem.getFSDirectory().getINode(path); + return inode == null ? -1 : inode.getId(); + } finally { + namesystem.readUnlock(); + } + } + + @Override + public void assignBlockMoveTaskToTargetNode(BlockMovingInfo blkMovingInfo) + throws IOException { + namesystem.readLock(); + try { + DatanodeDescriptor dn = blockManager.getDatanodeManager() + .getDatanode(blkMovingInfo.getTarget().getDatanodeUuid()); + if (dn == null) { + throw new IOException("Failed to schedule block movement task:" + + blkMovingInfo + " as target datanode: " + + blkMovingInfo.getTarget() + " doesn't exists"); + } + dn.addBlocksToMoveStorage(blkMovingInfo); + dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType()); + } finally { + namesystem.readUnlock(); + } + } + + @Override + public boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn, + StorageType type, long blockSize) { + namesystem.readLock(); + try { + DatanodeDescriptor datanode = blockManager.getDatanodeManager() + .getDatanode(dn.getDatanodeUuid()); + if (datanode == null) { + LOG.debug("Target datanode: " + dn + " doesn't exists"); + return false; + } + return null != datanode.chooseStorage4Block(type, blockSize); + } finally { + namesystem.readUnlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java index 0d4bb19..b3e6b78 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java @@ -29,29 +29,28 @@ import java.util.LinkedList; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.balancer.Matcher; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped.StorageAndBlockIndex; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; -import org.apache.hadoop.hdfs.server.namenode.INode; -import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.namenode.FSDirectory; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.util.Daemon; @@ -79,8 +78,6 @@ public class StoragePolicySatisfier implements Runnable { public static final Logger LOG = LoggerFactory.getLogger(StoragePolicySatisfier.class); private Daemon storagePolicySatisfierThread; - private final Namesystem namesystem; - private final BlockManager blockManager; private final BlockStorageMovementNeeded storageMovementNeeded; private final BlockStorageMovementAttemptedItems storageMovementsMonitor; private volatile boolean isRunning = false; @@ -90,16 +87,6 @@ public class StoragePolicySatisfier implements Runnable { private final Context ctxt; /** - * An interface for analyzing and assigning the block storage movements to - * worker nodes. - */ - // TODO: Now, added one API which is required for sps package. Will refine - // this interface via HDFS-12911. - public interface Context { - int getNumLiveDataNodes(); - } - - /** * Represents the collective analysis status for all blocks. */ private static class BlocksMovingAnalysis { @@ -124,7 +111,9 @@ public class StoragePolicySatisfier implements Runnable { BLOCKS_TARGET_PAIRING_SKIPPED, // Represents that, All the reported blocks are satisfied the policy but // some of the blocks are low redundant. - FEW_LOW_REDUNDANCY_BLOCKS + FEW_LOW_REDUNDANCY_BLOCKS, + // Represents that, movement failures due to unexpected errors. + BLOCKS_FAILED_TO_MOVE } private Status status = null; @@ -136,36 +125,27 @@ public class StoragePolicySatisfier implements Runnable { } } - public StoragePolicySatisfier(final Namesystem namesystem, - final BlockManager blkManager, Configuration conf, Context ctxt) { - this.namesystem = namesystem; - this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem, - this, conf.getInt( - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT)); - this.blockManager = blkManager; - this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems( - conf.getLong( - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT), - conf.getLong( - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY, - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT), + public StoragePolicySatisfier(Context ctxt) { + this.ctxt = ctxt; + this.storageMovementNeeded = new BlockStorageMovementNeeded(ctxt); + this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(ctxt, storageMovementNeeded); - this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf); - this.blockMovementMaxRetry = conf.getInt( + this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(ctxt.getConf()); + this.blockMovementMaxRetry = ctxt.getConf().getInt( DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY, DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT); - this.ctxt = ctxt; } /** * Start storage policy satisfier demon thread. Also start block storage * movements monitor for retry the attempts if needed. + * + * // TODO: FSDirectory will get removed via HDFS-12911 modularization work. */ - public synchronized void start(boolean reconfigStart) { + public synchronized void start(boolean reconfigStart, FSDirectory fsd) { isRunning = true; - if (checkIfMoverRunning()) { + ctxt.setSPSRunning(this::isRunning); + if (ctxt.isMoverRunning()) { isRunning = false; LOG.error( "Stopping StoragePolicySatisfier thread " + "as Mover ID file " @@ -183,7 +163,7 @@ public class StoragePolicySatisfier implements Runnable { // Ensure that all the previously submitted block movements(if any) have to // be stopped in all datanodes. addDropSPSWorkCommandsToAllDNs(); - storageMovementNeeded.init(); + storageMovementNeeded.init(fsd); storagePolicySatisfierThread = new Daemon(this); storagePolicySatisfierThread.setName("StoragePolicySatisfier"); storagePolicySatisfierThread.start(); @@ -199,7 +179,6 @@ public class StoragePolicySatisfier implements Runnable { */ public synchronized void disable(boolean forceStop) { isRunning = false; - if (storagePolicySatisfierThread == null) { return; } @@ -242,25 +221,19 @@ public class StoragePolicySatisfier implements Runnable { return isRunning; } - // Return true if a Mover instance is running - private boolean checkIfMoverRunning() { - String moverId = HdfsServerConstants.MOVER_ID_PATH.toString(); - return namesystem.isFileOpenedForWrite(moverId); - } - /** * Adding drop commands to all datanodes to stop performing the satisfier * block movements, if any. */ private void addDropSPSWorkCommandsToAllDNs() { - this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs(); + ctxt.addDropPreviousSPSWorkAtDNs(); } @Override public void run() { - while (namesystem.isRunning() && isRunning) { + while (ctxt.isRunning()) { try { - if (!namesystem.isInSafeMode()) { + if (!ctxt.isInSafeMode()) { ItemInfo itemInfo = storageMovementNeeded.get(); if (itemInfo != null) { if(itemInfo.getRetryCount() >= blockMovementMaxRetry){ @@ -271,25 +244,28 @@ public class StoragePolicySatisfier implements Runnable { continue; } long trackId = itemInfo.getTrackId(); - BlockCollection blockCollection; BlocksMovingAnalysis status = null; - try { - namesystem.readLock(); - blockCollection = namesystem.getBlockCollection(trackId); - // Check blockCollectionId existence. - if (blockCollection == null) { - // File doesn't exists (maybe got deleted), remove trackId from - // the queue - storageMovementNeeded.removeItemTrackInfo(itemInfo, true); - } else { - status = - analyseBlocksStorageMovementsAndAssignToDN( - blockCollection); - } - } finally { - namesystem.readUnlock(); - } - if (blockCollection != null) { + DatanodeStorageReport[] liveDnReports; + BlockStoragePolicy existingStoragePolicy; + // TODO: presently, context internally acquire the lock + // and returns the result. Need to discuss to move the lock outside? + boolean hasLowRedundancyBlocks = ctxt + .hasLowRedundancyBlocks(trackId); + HdfsFileStatus fileStatus = ctxt.getFileInfo(trackId); + // Check path existence. + if (fileStatus == null || fileStatus.isDir()) { + // File doesn't exists (maybe got deleted) or its a directory, + // just remove trackId from the queue + storageMovementNeeded.removeItemTrackInfo(itemInfo, true); + } else { + liveDnReports = ctxt.getLiveDatanodeStorageReport(); + byte existingStoragePolicyID = fileStatus.getStoragePolicy(); + existingStoragePolicy = ctxt + .getStoragePolicy(existingStoragePolicyID); + + HdfsLocatedFileStatus file = (HdfsLocatedFileStatus) fileStatus; + status = analyseBlocksStorageMovementsAndAssignToDN(file, + hasLowRedundancyBlocks, existingStoragePolicy, liveDnReports); switch (status.status) { // Just add to monitor, so it will be retried after timeout case ANALYSIS_SKIPPED_FOR_RETRY: @@ -317,6 +293,14 @@ public class StoragePolicySatisfier implements Runnable { } this.storageMovementNeeded.add(itemInfo); break; + case BLOCKS_FAILED_TO_MOVE: + if (LOG.isDebugEnabled()) { + LOG.debug("Adding trackID " + trackId + + " back to retry queue as some of the blocks" + + " movement failed."); + } + this.storageMovementNeeded.add(itemInfo); + break; // Just clean Xattrs case BLOCKS_TARGET_PAIRING_SKIPPED: case BLOCKS_ALREADY_SATISFIED: @@ -350,14 +334,11 @@ public class StoragePolicySatisfier implements Runnable { // Stopping monitor thread and clearing queues as well this.clearQueues(); this.storageMovementsMonitor.stopGracefully(); - if (!namesystem.isRunning()) { - LOG.info("Stopping StoragePolicySatisfier."); - if (!(t instanceof InterruptedException)) { - LOG.info("StoragePolicySatisfier received an exception" - + " while shutting down.", t); - } - return; + if (!(t instanceof InterruptedException)) { + LOG.info("StoragePolicySatisfier received an exception" + + " while shutting down.", t); } + LOG.info("Stopping StoragePolicySatisfier."); } } } @@ -367,41 +348,43 @@ public class StoragePolicySatisfier implements Runnable { } private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN( - BlockCollection blockCollection) { + HdfsLocatedFileStatus fileInfo, boolean hasLowRedundancyBlocks, + BlockStoragePolicy existingStoragePolicy, + DatanodeStorageReport[] liveDns) { BlocksMovingAnalysis.Status status = BlocksMovingAnalysis.Status.BLOCKS_ALREADY_SATISFIED; - byte existingStoragePolicyID = blockCollection.getStoragePolicyID(); - BlockStoragePolicy existingStoragePolicy = - blockManager.getStoragePolicy(existingStoragePolicyID); - if (!blockCollection.getLastBlock().isComplete()) { + final ErasureCodingPolicy ecPolicy = fileInfo.getErasureCodingPolicy(); + final LocatedBlocks locatedBlocks = fileInfo.getLocatedBlocks(); + final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete(); + if (!lastBlkComplete) { // Postpone, currently file is under construction // So, should we add back? or leave it to user LOG.info("BlockCollectionID: {} file is under construction. So, postpone" - + " this to the next retry iteration", blockCollection.getId()); + + " this to the next retry iteration", fileInfo.getFileId()); return new BlocksMovingAnalysis( BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY, new ArrayList<>()); } - BlockInfo[] blocks = blockCollection.getBlocks(); - if (blocks.length == 0) { + List<LocatedBlock> blocks = locatedBlocks.getLocatedBlocks(); + if (blocks.size() == 0) { LOG.info("BlockCollectionID: {} file is not having any blocks." - + " So, skipping the analysis.", blockCollection.getId()); + + " So, skipping the analysis.", fileInfo.getFileId()); return new BlocksMovingAnalysis( BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED, new ArrayList<>()); } List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>(); - for (int i = 0; i < blocks.length; i++) { - BlockInfo blockInfo = blocks[i]; + for (int i = 0; i < blocks.size(); i++) { + LocatedBlock blockInfo = blocks.get(i); List<StorageType> expectedStorageTypes; if (blockInfo.isStriped()) { if (ErasureCodingPolicyManager .checkStoragePolicySuitableForECStripedMode( - existingStoragePolicyID)) { + existingStoragePolicy.getId())) { expectedStorageTypes = existingStoragePolicy - .chooseStorageTypes((short) blockInfo.getCapacity()); + .chooseStorageTypes((short) blockInfo.getLocations().length); } else { // Currently we support only limited policies (HOT, COLD, ALLSSD) // for EC striped mode files. SPS will ignore to move the blocks if @@ -415,22 +398,16 @@ public class StoragePolicySatisfier implements Runnable { } } else { expectedStorageTypes = existingStoragePolicy - .chooseStorageTypes(blockInfo.getReplication()); + .chooseStorageTypes(fileInfo.getReplication()); } - DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo); - StorageType[] storageTypes = new StorageType[storages.length]; - for (int j = 0; j < storages.length; j++) { - DatanodeStorageInfo datanodeStorageInfo = storages[j]; - StorageType storageType = datanodeStorageInfo.getStorageType(); - storageTypes[j] = storageType; - } - List<StorageType> existing = - new LinkedList<StorageType>(Arrays.asList(storageTypes)); + List<StorageType> existing = new LinkedList<StorageType>( + Arrays.asList(blockInfo.getStorageTypes())); if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes, existing, true)) { boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos, - blockInfo, expectedStorageTypes, existing, storages); + blockInfo, expectedStorageTypes, existing, blockInfo.getLocations(), + liveDns, ecPolicy); if (blocksPaired) { status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED; } else { @@ -439,7 +416,7 @@ public class StoragePolicySatisfier implements Runnable { status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED; } } else { - if (blockManager.hasLowRedundancyBlocks(blockCollection)) { + if (hasLowRedundancyBlocks) { status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS; } } @@ -448,13 +425,15 @@ public class StoragePolicySatisfier implements Runnable { List<Block> assignedBlockIds = new ArrayList<Block>(); for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { // Check for at least one block storage movement has been chosen - if (blkMovingInfo.getTarget() != null) { - // assign block storage movement task to the target node - ((DatanodeDescriptor) blkMovingInfo.getTarget()) - .addBlocksToMoveStorage(blkMovingInfo); + try { + ctxt.assignBlockMoveTaskToTargetNode(blkMovingInfo); LOG.debug("BlockMovingInfo: {}", blkMovingInfo); assignedBlockIds.add(blkMovingInfo.getBlock()); blockCount++; + } catch (IOException e) { + LOG.warn("Exception while scheduling movement task", e); + // failed to move the block. + status = BlocksMovingAnalysis.Status.BLOCKS_FAILED_TO_MOVE; } } return new BlocksMovingAnalysis(status, assignedBlockIds); @@ -481,29 +460,29 @@ public class StoragePolicySatisfier implements Runnable { * satisfy the storage policy, true otherwise */ private boolean computeBlockMovingInfos( - List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo, + List<BlockMovingInfo> blockMovingInfos, LocatedBlock blockInfo, List<StorageType> expectedStorageTypes, List<StorageType> existing, - DatanodeStorageInfo[] storages) { + DatanodeInfo[] storages, DatanodeStorageReport[] liveDns, + ErasureCodingPolicy ecPolicy) { boolean foundMatchingTargetNodesForBlock = true; if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes, existing, true)) { List<StorageTypeNodePair> sourceWithStorageMap = new ArrayList<StorageTypeNodePair>(); - List<DatanodeStorageInfo> existingBlockStorages = - new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages)); + List<DatanodeInfo> existingBlockStorages = new ArrayList<DatanodeInfo>( + Arrays.asList(storages)); // if expected type exists in source node already, local movement would be // possible, so lets find such sources first. - Iterator<DatanodeStorageInfo> iterator = existingBlockStorages.iterator(); + Iterator<DatanodeInfo> iterator = existingBlockStorages.iterator(); while (iterator.hasNext()) { - DatanodeStorageInfo datanodeStorageInfo = iterator.next(); - if (checkSourceAndTargetTypeExists( - datanodeStorageInfo.getDatanodeDescriptor(), existing, - expectedStorageTypes)) { + DatanodeInfoWithStorage dnInfo = (DatanodeInfoWithStorage) iterator + .next(); + if (checkSourceAndTargetTypeExists(dnInfo, existing, + expectedStorageTypes, liveDns)) { sourceWithStorageMap - .add(new StorageTypeNodePair(datanodeStorageInfo.getStorageType(), - datanodeStorageInfo.getDatanodeDescriptor())); + .add(new StorageTypeNodePair(dnInfo.getStorageType(), dnInfo)); iterator.remove(); - existing.remove(datanodeStorageInfo.getStorageType()); + existing.remove(dnInfo.getStorageType()); } } @@ -511,23 +490,25 @@ public class StoragePolicySatisfier implements Runnable { for (StorageType existingType : existing) { iterator = existingBlockStorages.iterator(); while (iterator.hasNext()) { - DatanodeStorageInfo datanodeStorageInfo = iterator.next(); - StorageType storageType = datanodeStorageInfo.getStorageType(); + DatanodeInfoWithStorage dnStorageInfo = + (DatanodeInfoWithStorage) iterator.next(); + StorageType storageType = dnStorageInfo.getStorageType(); if (storageType == existingType) { iterator.remove(); sourceWithStorageMap.add(new StorageTypeNodePair(storageType, - datanodeStorageInfo.getDatanodeDescriptor())); + dnStorageInfo)); break; } } } StorageTypeNodeMap locsForExpectedStorageTypes = - findTargetsForExpectedStorageTypes(expectedStorageTypes); + findTargetsForExpectedStorageTypes(expectedStorageTypes, liveDns); foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove( blockMovingInfos, blockInfo, sourceWithStorageMap, - expectedStorageTypes, locsForExpectedStorageTypes); + expectedStorageTypes, locsForExpectedStorageTypes, + ecPolicy); } return foundMatchingTargetNodesForBlock; } @@ -550,12 +531,13 @@ public class StoragePolicySatisfier implements Runnable { * satisfy the storage policy */ private boolean findSourceAndTargetToMove( - List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo, + List<BlockMovingInfo> blockMovingInfos, LocatedBlock blockInfo, List<StorageTypeNodePair> sourceWithStorageList, List<StorageType> expected, - StorageTypeNodeMap locsForExpectedStorageTypes) { + StorageTypeNodeMap locsForExpectedStorageTypes, + ErasureCodingPolicy ecPolicy) { boolean foundMatchingTargetNodesForBlock = true; - List<DatanodeDescriptor> excludeNodes = new ArrayList<>(); + List<DatanodeInfo> excludeNodes = new ArrayList<>(); // Looping over all the source node locations and choose the target // storage within same node if possible. This is done separately to @@ -566,13 +548,14 @@ public class StoragePolicySatisfier implements Runnable { // Check whether the block replica is already placed in the expected // storage type in this source datanode. if (!expected.contains(existingTypeNodePair.storageType)) { - StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode( - blockInfo, existingTypeNodePair.dn, expected); + StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(blockInfo, + existingTypeNodePair.dn, expected); if (chosenTarget != null) { if (blockInfo.isStriped()) { buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn, existingTypeNodePair.storageType, chosenTarget.dn, - chosenTarget.storageType, blockMovingInfos); + chosenTarget.storageType, blockMovingInfos, + ecPolicy); } else { buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn, existingTypeNodePair.storageType, chosenTarget.dn, @@ -596,7 +579,7 @@ public class StoragePolicySatisfier implements Runnable { if (checkIfAlreadyChosen(blockMovingInfos, existingTypeNodePair.dn)) { continue; } - if (chosenTarget == null && blockManager.getDatanodeManager() + if (chosenTarget == null && ctxt .getNetworkTopology().isNodeGroupAware()) { chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn, expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes, @@ -619,7 +602,7 @@ public class StoragePolicySatisfier implements Runnable { if (blockInfo.isStriped()) { buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn, existingTypeNodePair.storageType, chosenTarget.dn, - chosenTarget.storageType, blockMovingInfos); + chosenTarget.storageType, blockMovingInfos, ecPolicy); } else { buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn, existingTypeNodePair.storageType, chosenTarget.dn, @@ -645,7 +628,7 @@ public class StoragePolicySatisfier implements Runnable { } private boolean checkIfAlreadyChosen(List<BlockMovingInfo> blockMovingInfos, - DatanodeDescriptor dn) { + DatanodeInfo dn) { for (BlockMovingInfo blockMovingInfo : blockMovingInfos) { if (blockMovingInfo.getSource().equals(dn)) { return true; @@ -654,37 +637,40 @@ public class StoragePolicySatisfier implements Runnable { return false; } - private void buildContinuousBlockMovingInfos(BlockInfo blockInfo, + private void buildContinuousBlockMovingInfos(LocatedBlock blockInfo, DatanodeInfo sourceNode, StorageType sourceStorageType, DatanodeInfo targetNode, StorageType targetStorageType, List<BlockMovingInfo> blkMovingInfos) { - Block blk = new Block(blockInfo.getBlockId(), blockInfo.getNumBytes(), - blockInfo.getGenerationStamp()); + Block blk = ExtendedBlock.getLocalBlock(blockInfo.getBlock()); BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode, targetNode, sourceStorageType, targetStorageType); blkMovingInfos.add(blkMovingInfo); } - private void buildStripedBlockMovingInfos(BlockInfo blockInfo, + private void buildStripedBlockMovingInfos(LocatedBlock blockInfo, DatanodeInfo sourceNode, StorageType sourceStorageType, DatanodeInfo targetNode, StorageType targetStorageType, - List<BlockMovingInfo> blkMovingInfos) { + List<BlockMovingInfo> blkMovingInfos, ErasureCodingPolicy ecPolicy) { // For a striped block, it needs to construct internal block at the given // index of a block group. Here it is iterating over all the block indices // and construct internal blocks which can be then considered for block // movement. - BlockInfoStriped sBlockInfo = (BlockInfoStriped) blockInfo; - for (StorageAndBlockIndex si : sBlockInfo.getStorageAndIndexInfos()) { - if (si.getBlockIndex() >= 0) { - DatanodeDescriptor dn = si.getStorage().getDatanodeDescriptor(); - if (sourceNode.equals(dn)) { + LocatedStripedBlock sBlockInfo = (LocatedStripedBlock) blockInfo; + byte[] indices = sBlockInfo.getBlockIndices(); + DatanodeInfo[] locations = sBlockInfo.getLocations(); + for (int i = 0; i < indices.length; i++) { + byte blkIndex = indices[i]; + if (blkIndex >= 0) { + // pick block movement only for the given source node. + if (sourceNode.equals(locations[i])) { // construct internal block - long blockId = blockInfo.getBlockId() + si.getBlockIndex(); + ExtendedBlock extBlock = sBlockInfo.getBlock(); long numBytes = StripedBlockUtil.getInternalBlockLength( - sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(), - sBlockInfo.getDataBlockNum(), si.getBlockIndex()); - Block blk = new Block(blockId, numBytes, - blockInfo.getGenerationStamp()); + extBlock.getNumBytes(), ecPolicy, blkIndex); + Block blk = new Block(ExtendedBlock.getLocalBlock(extBlock)); + long blkId = blk.getBlockId() + blkIndex; + blk.setBlockId(blkId); + blk.setNumBytes(numBytes); BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode, targetNode, sourceStorageType, targetStorageType); blkMovingInfos.add(blkMovingInfo); @@ -703,34 +689,35 @@ public class StoragePolicySatisfier implements Runnable { * @param targetTypes * - list of target storage types */ - private StorageTypeNodePair chooseTargetTypeInSameNode(Block block, - DatanodeDescriptor source, List<StorageType> targetTypes) { + private StorageTypeNodePair chooseTargetTypeInSameNode(LocatedBlock blockInfo, + DatanodeInfo source, List<StorageType> targetTypes) { for (StorageType t : targetTypes) { - DatanodeStorageInfo chooseStorage4Block = - source.chooseStorage4Block(t, block.getNumBytes()); - if (chooseStorage4Block != null) { + boolean goodTargetDn = ctxt.verifyTargetDatanodeHasSpaceForScheduling( + source, t, blockInfo.getBlockSize()); + if (goodTargetDn) { return new StorageTypeNodePair(t, source); } } return null; } - private StorageTypeNodePair chooseTarget(Block block, - DatanodeDescriptor source, List<StorageType> targetTypes, Matcher matcher, + private StorageTypeNodePair chooseTarget(LocatedBlock block, + DatanodeInfo source, List<StorageType> targetTypes, Matcher matcher, StorageTypeNodeMap locsForExpectedStorageTypes, - List<DatanodeDescriptor> excludeNodes) { + List<DatanodeInfo> excludeNodes) { for (StorageType t : targetTypes) { - List<DatanodeDescriptor> nodesWithStorages = - locsForExpectedStorageTypes.getNodesWithStorages(t); + List<DatanodeInfo> nodesWithStorages = locsForExpectedStorageTypes + .getNodesWithStorages(t); if (nodesWithStorages == null || nodesWithStorages.isEmpty()) { continue; // no target nodes with the required storage type. } Collections.shuffle(nodesWithStorages); - for (DatanodeDescriptor target : nodesWithStorages) { - if (!excludeNodes.contains(target) && matcher.match( - blockManager.getDatanodeManager().getNetworkTopology(), source, - target)) { - if (null != target.chooseStorage4Block(t, block.getNumBytes())) { + for (DatanodeInfo target : nodesWithStorages) { + if (!excludeNodes.contains(target) + && matcher.match(ctxt.getNetworkTopology(), source, target)) { + boolean goodTargetDn = ctxt.verifyTargetDatanodeHasSpaceForScheduling( + target, t, block.getBlockSize()); + if (goodTargetDn) { return new StorageTypeNodePair(t, target); } } @@ -741,27 +728,25 @@ public class StoragePolicySatisfier implements Runnable { private static class StorageTypeNodePair { private StorageType storageType = null; - private DatanodeDescriptor dn = null; + private DatanodeInfo dn = null; - StorageTypeNodePair(StorageType storageType, DatanodeDescriptor dn) { + StorageTypeNodePair(StorageType storageType, DatanodeInfo dn) { this.storageType = storageType; this.dn = dn; } } private StorageTypeNodeMap findTargetsForExpectedStorageTypes( - List<StorageType> expected) { + List<StorageType> expected, DatanodeStorageReport[] liveDns) { StorageTypeNodeMap targetMap = new StorageTypeNodeMap(); - List<DatanodeDescriptor> reports = blockManager.getDatanodeManager() - .getDatanodeListForReport(DatanodeReportType.LIVE); - for (DatanodeDescriptor dn : reports) { + for (DatanodeStorageReport dn : liveDns) { StorageReport[] storageReports = dn.getStorageReports(); for (StorageReport storageReport : storageReports) { StorageType t = storageReport.getStorage().getStorageType(); if (expected.contains(t)) { final long maxRemaining = getMaxRemaining(dn.getStorageReports(), t); if (maxRemaining > 0L) { - targetMap.add(t, dn); + targetMap.add(t, dn.getDatanodeInfo()); } } } @@ -782,32 +767,40 @@ public class StoragePolicySatisfier implements Runnable { return max; } - private boolean checkSourceAndTargetTypeExists(DatanodeDescriptor dn, - List<StorageType> existing, List<StorageType> expectedStorageTypes) { - DatanodeStorageInfo[] allDNStorageInfos = dn.getStorageInfos(); + private boolean checkSourceAndTargetTypeExists(DatanodeInfo dn, + List<StorageType> existing, List<StorageType> expectedStorageTypes, + DatanodeStorageReport[] liveDns) { boolean isExpectedTypeAvailable = false; boolean isExistingTypeAvailable = false; - for (DatanodeStorageInfo dnInfo : allDNStorageInfos) { - StorageType storageType = dnInfo.getStorageType(); - if (existing.contains(storageType)) { - isExistingTypeAvailable = true; - } - if (expectedStorageTypes.contains(storageType)) { - isExpectedTypeAvailable = true; + for (DatanodeStorageReport liveDn : liveDns) { + if (dn.equals(liveDn.getDatanodeInfo())) { + StorageReport[] storageReports = liveDn.getStorageReports(); + for (StorageReport eachStorage : storageReports) { + StorageType storageType = eachStorage.getStorage().getStorageType(); + if (existing.contains(storageType)) { + isExistingTypeAvailable = true; + } + if (expectedStorageTypes.contains(storageType)) { + isExpectedTypeAvailable = true; + } + if (isExistingTypeAvailable && isExpectedTypeAvailable) { + return true; + } + } } } return isExistingTypeAvailable && isExpectedTypeAvailable; } private static class StorageTypeNodeMap { - private final EnumMap<StorageType, List<DatanodeDescriptor>> typeNodeMap = - new EnumMap<StorageType, List<DatanodeDescriptor>>(StorageType.class); + private final EnumMap<StorageType, List<DatanodeInfo>> typeNodeMap = + new EnumMap<StorageType, List<DatanodeInfo>>(StorageType.class); - private void add(StorageType t, DatanodeDescriptor dn) { - List<DatanodeDescriptor> nodesWithStorages = getNodesWithStorages(t); - LinkedList<DatanodeDescriptor> value = null; + private void add(StorageType t, DatanodeInfo dn) { + List<DatanodeInfo> nodesWithStorages = getNodesWithStorages(t); + LinkedList<DatanodeInfo> value = null; if (nodesWithStorages == null) { - value = new LinkedList<DatanodeDescriptor>(); + value = new LinkedList<DatanodeInfo>(); value.add(dn); typeNodeMap.put(t, value); } else { @@ -820,7 +813,7 @@ public class StoragePolicySatisfier implements Runnable { * - Storage type * @return datanodes which has the given storage type */ - private List<DatanodeDescriptor> getNodesWithStorages(StorageType type) { + private List<DatanodeInfo> getNodesWithStorages(StorageType type) { return typeNodeMap.get(type); } } @@ -982,7 +975,6 @@ public class StoragePolicySatisfier implements Runnable { public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus( String path) throws IOException { - INode inode = namesystem.getFSDirectory().getINode(path); - return storageMovementNeeded.getStatus(inode.getId()); + return storageMovementNeeded.getStatus(ctxt.getFileID(path)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java index 62766d9..f9762a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java @@ -25,8 +25,9 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo; import org.junit.After; @@ -46,11 +47,15 @@ public class TestBlockStorageMovementAttemptedItems { @Before public void setup() throws Exception { - unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded( - Mockito.mock(Namesystem.class), - Mockito.mock(StoragePolicySatisfier.class), 100); - bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, - selfRetryTimeout, unsatisfiedStorageMovementFiles); + Configuration config = new HdfsConfiguration(); + Context ctxt = Mockito.mock(Context.class); + Mockito.when(ctxt.getConf()).thenReturn(config); + Mockito.when(ctxt.isRunning()).thenReturn(true); + Mockito.when(ctxt.isInSafeMode()).thenReturn(false); + Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true); + unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(ctxt); + bsmAttemptedItems = new BlockStorageMovementAttemptedItems(ctxt, + unsatisfiedStorageMovementFiles); } @After http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java index 8dc52dc..2a7bde5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode.sps; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -68,6 +69,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils.LogCapturer; +import org.junit.After; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -105,7 +107,8 @@ public class TestStoragePolicySatisfier { private DistributedFileSystem dfs = null; private static final int DEFAULT_BLOCK_SIZE = 1024; - private void shutdownCluster() { + @After + public void shutdownCluster() { if (hdfsCluster != null) { hdfsCluster.shutdown(); } @@ -1298,11 +1301,17 @@ public class TestStoragePolicySatisfier { //entry in queue. After 10 files, traverse control will be on U. StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class); Mockito.when(sps.isRunning()).thenReturn(true); + Context ctxt = Mockito.mock(Context.class); + config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10); + Mockito.when(ctxt.getConf()).thenReturn(config); + Mockito.when(ctxt.isRunning()).thenReturn(true); + Mockito.when(ctxt.isInSafeMode()).thenReturn(false); + Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true); BlockStorageMovementNeeded movmentNeededQueue = - new BlockStorageMovementNeeded(hdfsCluster.getNamesystem(), sps, 10); + new BlockStorageMovementNeeded(ctxt); INode rootINode = fsDir.getINode("/root"); movmentNeededQueue.addToPendingDirQueue(rootINode.getId()); - movmentNeededQueue.init(); + movmentNeededQueue.init(fsDir); //Wait for thread to reach U. Thread.sleep(1000); @@ -1361,9 +1370,15 @@ public class TestStoragePolicySatisfier { Mockito.when(sps.isRunning()).thenReturn(true); // Queue limit can control the traverse logic to wait for some free // entry in queue. After 10 files, traverse control will be on U. + Context ctxt = Mockito.mock(Context.class); + config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10); + Mockito.when(ctxt.getConf()).thenReturn(config); + Mockito.when(ctxt.isRunning()).thenReturn(true); + Mockito.when(ctxt.isInSafeMode()).thenReturn(false); + Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true); BlockStorageMovementNeeded movmentNeededQueue = - new BlockStorageMovementNeeded(hdfsCluster.getNamesystem(), sps, 10); - movmentNeededQueue.init(); + new BlockStorageMovementNeeded(ctxt); + movmentNeededQueue.init(fsDir); INode rootINode = fsDir.getINode("/root"); movmentNeededQueue.addToPendingDirQueue(rootINode.getId()); // Wait for thread to reach U. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org