HDFS-12911. [SPS]: Modularize the SPS code and expose necessary interfaces for external/internal implementations. Contributed by Uma Maheswara Rao G
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a0c8e48c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a0c8e48c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a0c8e48c Branch: refs/heads/HDFS-10285 Commit: a0c8e48c40e31d741b192a649d1a7773e2384437 Parents: 113185e Author: Rakesh Radhakrishnan <rake...@apache.org> Authored: Fri Jan 19 08:51:49 2018 +0530 Committer: Rakesh Radhakrishnan <rake...@apache.org> Committed: Tue Jul 31 12:10:15 2018 +0530 ---------------------------------------------------------------------- .../server/blockmanagement/BlockManager.java | 61 +++++- .../namenode/FSDirSatisfyStoragePolicyOp.java | 16 +- .../hdfs/server/namenode/FSDirectory.java | 6 +- .../hdfs/server/namenode/FSNamesystem.java | 10 +- .../namenode/sps/BlockMoveTaskHandler.java | 44 ++++ .../namenode/sps/BlockMovementListener.java | 40 ++++ .../sps/BlockStorageMovementAttemptedItems.java | 28 +-- .../sps/BlockStorageMovementNeeded.java | 207 ++++--------------- .../hdfs/server/namenode/sps/Context.java | 43 ++-- .../server/namenode/sps/FileIdCollector.java | 43 ++++ .../IntraSPSNameNodeBlockMoveTaskHandler.java | 62 ++++++ .../namenode/sps/IntraSPSNameNodeContext.java | 62 ++---- .../sps/IntraSPSNameNodeFileIdCollector.java | 178 ++++++++++++++++ .../hdfs/server/namenode/sps/ItemInfo.java | 81 ++++++++ .../hdfs/server/namenode/sps/SPSPathIds.java | 63 ++++++ .../hdfs/server/namenode/sps/SPSService.java | 107 ++++++++++ .../namenode/sps/StoragePolicySatisfier.java | 175 +++++++--------- .../TestBlockStorageMovementAttemptedItems.java | 19 +- .../sps/TestStoragePolicySatisfier.java | 111 ++++++---- ...stStoragePolicySatisfierWithStripedFile.java | 19 +- 20 files changed, 938 insertions(+), 437 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index c2d5162..63117ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -93,8 +93,8 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; -import org.apache.hadoop.hdfs.server.namenode.sps.Context; -import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext; +import org.apache.hadoop.hdfs.server.namenode.sps.SPSPathIds; +import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; @@ -434,7 +434,8 @@ public class BlockManager implements BlockStatsMXBean { private final StoragePolicySatisfier sps; private final boolean storagePolicyEnabled; private boolean spsEnabled; - private Context spsctxt = null; + private final SPSPathIds spsPaths; + /** Minimum live replicas needed for the datanode to be transitioned * from ENTERING_MAINTENANCE to IN_MAINTENANCE. */ @@ -481,8 +482,8 @@ public class BlockManager implements BlockStatsMXBean { conf.getBoolean( DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT); - spsctxt = new IntraSPSNameNodeContext(namesystem, this, conf); - sps = new StoragePolicySatisfier(spsctxt); + sps = new StoragePolicySatisfier(conf); + spsPaths = new SPSPathIds(); blockTokenSecretManager = createBlockTokenSecretManager(conf); providedStorageMap = new ProvidedStorageMap(namesystem, this, conf); @@ -5033,8 +5034,7 @@ public class BlockManager implements BlockStatsMXBean { LOG.info("Storage policy satisfier is already running."); return; } - // TODO: FSDirectory will get removed via HDFS-12911 modularization work - sps.start(false, namesystem.getFSDirectory()); + sps.start(false); } /** @@ -5070,8 +5070,7 @@ public class BlockManager implements BlockStatsMXBean { LOG.info("Storage policy satisfier is already running."); return; } - // TODO: FSDirectory will get removed via HDFS-12911 modularization work - sps.start(true, namesystem.getFSDirectory()); + sps.start(true); } /** @@ -5111,4 +5110,48 @@ public class BlockManager implements BlockStatsMXBean { String path) throws IOException { return sps.checkStoragePolicySatisfyPathStatus(path); } + + /** + * @return SPS service instance. + */ + public SPSService getSPSService() { + return this.sps; + } + + /** + * @return the next SPS path id, on which path users has invoked to satisfy + * storages. + */ + public Long getNextSPSPathId() { + return spsPaths.pollNext(); + } + + /** + * Removes the SPS path id from the list of sps paths. + */ + public void removeSPSPathId(long trackId) { + spsPaths.remove(trackId); + } + + /** + * Clean up all sps path ids. + */ + public void removeAllSPSPathIds() { + spsPaths.clear(); + } + + /** + * Adds the sps path to SPSPathIds list. + */ + public void addSPSPathId(long id) { + spsPaths.add(id); + } + + /** + * @return true if sps enabled. + */ + public boolean isSPSEnabled() { + return spsEnabled; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java index fb6eec9..eed6e52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp; -import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import com.google.common.collect.Lists; @@ -87,21 +86,14 @@ final class FSDirSatisfyStoragePolicyOp { } static boolean unprotectedSatisfyStoragePolicy(INode inode, FSDirectory fsd) { - if (inode.isFile() && inode.asFile().numBlocks() != 0) { - // Adding directly in the storageMovementNeeded queue, So it can - // get more priority compare to directory. - fsd.getBlockManager().getStoragePolicySatisfier() - .satisfyStoragePolicy(inode.getId()); - return true; - } else if (inode.isDirectory() - && inode.asDirectory().getChildrenNum(Snapshot.CURRENT_STATE_ID) > 0) { + if (inode.isFile() && inode.asFile().numBlocks() == 0) { + return false; + } else { // Adding directory in the pending queue, so FileInodeIdCollector process // directory child in batch and recursively - fsd.getBlockManager().getStoragePolicySatisfier() - .addInodeToPendingDirQueue(inode.getId()); + fsd.getBlockManager().addSPSPathId(inode.getId()); return true; } - return false; } private static boolean inodeHasSatisfyXAttr(INode inode) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 35341d7..2c9d627 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -1401,14 +1401,16 @@ public class FSDirectory implements Closeable { if (!inode.isSymlink()) { final XAttrFeature xaf = inode.getXAttrFeature(); addEncryptionZone((INodeWithAdditionalFields) inode, xaf); - addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf); + if (namesystem.getBlockManager().isSPSEnabled()) { + addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf); + } } } } private void addStoragePolicySatisfier(INodeWithAdditionalFields inode, XAttrFeature xaf) { - if (xaf == null || inode.isDirectory()) { + if (xaf == null) { return; } XAttr xattr = xaf.getXAttr(XATTR_SATISFY_STORAGE_POLICY); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 25a45c4..352edbd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -258,6 +258,9 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager; +import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler; +import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext; +import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeFileIdCollector; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; @@ -1291,7 +1294,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir, edekCacheLoaderDelay, edekCacheLoaderInterval); } - + blockManager.getSPSService().init( + new IntraSPSNameNodeContext(this, blockManager, + blockManager.getSPSService()), + new IntraSPSNameNodeFileIdCollector(getFSDirectory(), + blockManager.getSPSService()), + new IntraSPSNameNodeBlockMoveTaskHandler(blockManager, this)); blockManager.startSPS(); } finally { startingActiveService = false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java new file mode 100644 index 0000000..e6f78e1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode.sps; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; + +/** + * Interface for implementing different ways of block moving approaches. One can + * connect directly to DN and request block move, and other can talk NN to + * schedule via heart-beats. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface BlockMoveTaskHandler { + + /** + * This is an interface method to handle the move tasks. BlockMovingInfo must + * contain the required info to move the block, that source location, + * destination location and storage types. + */ + void submitMoveTask(BlockMovingInfo blkMovingInfo, + BlockMovementListener blockMoveCompletionListener) throws IOException; + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMovementListener.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMovementListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMovementListener.java new file mode 100644 index 0000000..36473f3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMovementListener.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode.sps; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.Block; + +/** + * Interface for notifying about block movement attempt completion. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface BlockMovementListener { + + /** + * This method used to notify to the SPS about block movement attempt + * finished. Then SPS will re-check whether it needs retry or not. + * + * @param moveAttemptFinishedBlks + * -list of movement attempt finished blocks + */ + void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java index 1cae027..3f0155d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java @@ -32,7 +32,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo; -import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +46,8 @@ import com.google.common.annotations.VisibleForTesting; * finished for a longer time period, then such items will retries automatically * after timeout. The default timeout would be 5 minutes. */ -public class BlockStorageMovementAttemptedItems { +public class BlockStorageMovementAttemptedItems + implements BlockMovementListener { private static final Logger LOG = LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class); @@ -71,19 +71,19 @@ public class BlockStorageMovementAttemptedItems { // private long minCheckTimeout = 1 * 60 * 1000; // minimum value private BlockStorageMovementNeeded blockStorageMovementNeeded; - private final Context ctxt; + private final SPSService service; - public BlockStorageMovementAttemptedItems(Context context, + public BlockStorageMovementAttemptedItems(SPSService service, BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) { - this.ctxt = context; - long recheckTimeout = ctxt.getConf().getLong( + this.service = service; + long recheckTimeout = this.service.getConf().getLong( DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT); if (recheckTimeout > 0) { this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout); } - this.selfRetryTimeout = ctxt.getConf().getLong( + this.selfRetryTimeout = this.service.getConf().getLong( DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY, DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT); this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles; @@ -111,7 +111,7 @@ public class BlockStorageMovementAttemptedItems { * @param moveAttemptFinishedBlks * storage movement attempt finished blocks */ - public void addReportedMovedBlocks(Block[] moveAttemptFinishedBlks) { + public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) { if (moveAttemptFinishedBlks.length == 0) { return; } @@ -191,7 +191,7 @@ public class BlockStorageMovementAttemptedItems { AttemptedItemInfo itemInfo = iter.next(); if (now > itemInfo.getLastAttemptedOrReportedTime() + selfRetryTimeout) { - Long blockCollectionID = itemInfo.getTrackId(); + Long blockCollectionID = itemInfo.getFileId(); synchronized (movementFinishedBlocks) { ItemInfo candidate = new ItemInfo(itemInfo.getStartId(), blockCollectionID, itemInfo.getRetryCount() + 1); @@ -223,7 +223,7 @@ public class BlockStorageMovementAttemptedItems { // gets the chance first and can be cleaned from queue quickly as // all movements already done. blockStorageMovementNeeded.add(new ItemInfo(attemptedItemInfo - .getStartId(), attemptedItemInfo.getTrackId(), + .getStartId(), attemptedItemInfo.getFileId(), attemptedItemInfo.getRetryCount() + 1)); iterator.remove(); } @@ -246,7 +246,11 @@ public class BlockStorageMovementAttemptedItems { } public void clearQueues() { - movementFinishedBlocks.clear(); - storageMovementAttemptedItems.clear(); + synchronized (movementFinishedBlocks) { + movementFinishedBlocks.clear(); + } + synchronized (storageMovementAttemptedItems) { + storageMovementAttemptedItems.clear(); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java index 80f1893..39a0051 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java @@ -17,11 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode.sps; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY; - import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -33,12 +29,6 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.server.namenode.FSDirectory; -import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser; -import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo; -import org.apache.hadoop.hdfs.server.namenode.INode; -import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Time; import org.slf4j.Logger; @@ -75,22 +65,21 @@ public class BlockStorageMovementNeeded { private final Context ctxt; - // List of pending dir to satisfy the policy - private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>(); + private Daemon pathIdCollector; - private Daemon inodeIdCollector; + private FileIdCollector fileIDCollector; - private final int maxQueuedItem; + private SPSPathIdProcessor pathIDProcessor; // Amount of time to cache the SUCCESS status of path before turning it to // NOT_AVAILABLE. private static long statusClearanceElapsedTimeMs = 300000; - public BlockStorageMovementNeeded(Context context) { + public BlockStorageMovementNeeded(Context context, + FileIdCollector fileIDCollector) { this.ctxt = context; - this.maxQueuedItem = ctxt.getConf().getInt( - DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, - DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT); + this.fileIDCollector = fileIDCollector; + pathIDProcessor = new SPSPathIdProcessor(); } /** @@ -140,29 +129,6 @@ public class BlockStorageMovementNeeded { return storageMovementNeeded.poll(); } - public synchronized void addToPendingDirQueue(long id) { - spsStatus.put(id, new StoragePolicySatisfyPathStatusInfo( - StoragePolicySatisfyPathStatus.PENDING)); - spsDirsToBeTraveresed.add(id); - // Notify waiting FileInodeIdCollector thread about the newly - // added SPS path. - synchronized (spsDirsToBeTraveresed) { - spsDirsToBeTraveresed.notify(); - } - } - - /** - * Returns queue remaining capacity. - */ - public synchronized int remainingCapacity() { - int size = storageMovementNeeded.size(); - if (size >= maxQueuedItem) { - return 0; - } else { - return (maxQueuedItem - size); - } - } - /** * Returns queue size. */ @@ -171,7 +137,7 @@ public class BlockStorageMovementNeeded { } public synchronized void clearAll() { - spsDirsToBeTraveresed.clear(); + ctxt.removeAllSPSPathIds(); storageMovementNeeded.clear(); pendingWorkForDirectory.clear(); } @@ -206,13 +172,13 @@ public class BlockStorageMovementNeeded { } else { // Remove xAttr if trackID doesn't exist in // storageMovementAttemptedItems or file policy satisfied. - ctxt.removeSPSHint(trackInfo.getTrackId()); + ctxt.removeSPSHint(trackInfo.getFileId()); updateStatus(trackInfo.getStartId(), isSuccess); } } public synchronized void clearQueue(long trackId) { - spsDirsToBeTraveresed.remove(trackId); + ctxt.removeSPSPathId(trackId); Iterator<ItemInfo> iterator = storageMovementNeeded.iterator(); while (iterator.hasNext()) { ItemInfo next = iterator.next(); @@ -249,7 +215,7 @@ public class BlockStorageMovementNeeded { public synchronized void clearQueuesWithNotification() { // Remove xAttr from directories Long trackId; - while ((trackId = spsDirsToBeTraveresed.poll()) != null) { + while ((trackId = ctxt.getNextSPSPathId()) != null) { try { // Remove xAttr for file ctxt.removeSPSHint(trackId); @@ -265,12 +231,12 @@ public class BlockStorageMovementNeeded { try { // Remove xAttr for file if (!itemInfo.isDir()) { - ctxt.removeSPSHint(itemInfo.getTrackId()); + ctxt.removeSPSHint(itemInfo.getFileId()); } } catch (IOException ie) { LOG.warn( "Failed to remove SPS xattr for track id " - + itemInfo.getTrackId(), ie); + + itemInfo.getFileId(), ie); } } this.clearAll(); @@ -280,57 +246,33 @@ public class BlockStorageMovementNeeded { * Take dir tack ID from the spsDirsToBeTraveresed queue and collect child * ID's to process for satisfy the policy. */ - private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser - implements Runnable { - - private int remainingCapacity = 0; - - private List<ItemInfo> currentBatch = new ArrayList<>(maxQueuedItem); - - StorageMovementPendingInodeIdCollector(FSDirectory dir) { - super(dir); - } + private class SPSPathIdProcessor implements Runnable { @Override public void run() { LOG.info("Starting FileInodeIdCollector!."); long lastStatusCleanTime = 0; while (ctxt.isRunning()) { + LOG.info("Running FileInodeIdCollector!."); try { if (!ctxt.isInSafeMode()) { - Long startINodeId = spsDirsToBeTraveresed.poll(); + Long startINodeId = ctxt.getNextSPSPathId(); if (startINodeId == null) { // Waiting for SPS path - synchronized (spsDirsToBeTraveresed) { - spsDirsToBeTraveresed.wait(5000); - } + Thread.sleep(3000); } else { - INode startInode = getFSDirectory().getInode(startINodeId); - if (startInode != null) { - try { - remainingCapacity = remainingCapacity(); - spsStatus.put(startINodeId, - new StoragePolicySatisfyPathStatusInfo( - StoragePolicySatisfyPathStatus.IN_PROGRESS)); - readLock(); - traverseDir(startInode.asDirectory(), startINodeId, - HdfsFileStatus.EMPTY_NAME, - new SPSTraverseInfo(startINodeId)); - } finally { - readUnlock(); - } - // Mark startInode traverse is done - addAll(startInode.getId(), currentBatch, true); - currentBatch.clear(); - - // check if directory was empty and no child added to queue - DirPendingWorkInfo dirPendingWorkInfo = - pendingWorkForDirectory.get(startInode.getId()); - if (dirPendingWorkInfo.isDirWorkDone()) { - ctxt.removeSPSHint(startInode.getId()); - pendingWorkForDirectory.remove(startInode.getId()); - updateStatus(startInode.getId(), true); - } + spsStatus.put(startINodeId, + new StoragePolicySatisfyPathStatusInfo( + StoragePolicySatisfyPathStatus.IN_PROGRESS)); + fileIDCollector.scanAndCollectFileIds(startINodeId); + // check if directory was empty and no child added to queue + DirPendingWorkInfo dirPendingWorkInfo = + pendingWorkForDirectory.get(startINodeId); + if (dirPendingWorkInfo != null + && dirPendingWorkInfo.isDirWorkDone()) { + ctxt.removeSPSHint(startINodeId); + pendingWorkForDirectory.remove(startINodeId); + updateStatus(startINodeId, true); } } //Clear the SPS status if status is in SUCCESS more than 5 min. @@ -355,71 +297,6 @@ public class BlockStorageMovementNeeded { } } } - - @Override - protected void checkPauseForTesting() throws InterruptedException { - // TODO implement if needed - } - - @Override - protected boolean processFileInode(INode inode, TraverseInfo traverseInfo) - throws IOException, InterruptedException { - if (LOG.isTraceEnabled()) { - LOG.trace("Processing {} for statisy the policy", - inode.getFullPathName()); - } - if (!inode.isFile()) { - return false; - } - if (inode.isFile() && inode.asFile().numBlocks() != 0) { - currentBatch.add(new ItemInfo( - ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId())); - remainingCapacity--; - } - return true; - } - - @Override - protected boolean canSubmitCurrentBatch() { - return remainingCapacity <= 0; - } - - @Override - protected void checkINodeReady(long startId) throws IOException { - // SPS work won't be scheduled if NN is in standby. So, skipping NN - // standby check. - return; - } - - @Override - protected void submitCurrentBatch(long startId) - throws IOException, InterruptedException { - // Add current child's to queue - addAll(startId, currentBatch, false); - currentBatch.clear(); - } - - @Override - protected void throttle() throws InterruptedException { - if (LOG.isDebugEnabled()) { - LOG.debug("StorageMovementNeeded queue remaining capacity is zero," - + " waiting for some free slots."); - } - remainingCapacity = remainingCapacity(); - // wait for queue to be free - while (remainingCapacity <= 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Waiting for storageMovementNeeded queue to be free!"); - } - Thread.sleep(5000); - remainingCapacity = remainingCapacity(); - } - } - - @Override - protected boolean canTraverseDir(INode inode) throws IOException { - return true; - } } /** @@ -476,29 +353,15 @@ public class BlockStorageMovementNeeded { } } - // TODO: FSDirectory will get removed via HDFS-12911 modularization work - public void init(FSDirectory fsd) { - inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector( - fsd)); - inodeIdCollector.setName("FileInodeIdCollector"); - inodeIdCollector.start(); + public void activate() { + pathIdCollector = new Daemon(pathIDProcessor); + pathIdCollector.setName("SPSPathIdProcessor"); + pathIdCollector.start(); } public void close() { - if (inodeIdCollector != null) { - inodeIdCollector.interrupt(); - } - } - - class SPSTraverseInfo extends TraverseInfo { - private long startId; - - SPSTraverseInfo(long startId) { - this.startId = startId; - } - - public long getStartId() { - return startId; + if (pathIdCollector != null) { + pathIdCollector.interrupt(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java index d11e26f..b7053b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java @@ -19,11 +19,9 @@ package org.apache.hadoop.hdfs.server.namenode.sps; import java.io.IOException; -import java.util.function.Supplier; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.UnresolvedLinkException; @@ -31,7 +29,6 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; -import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.AccessControlException; @@ -43,24 +40,11 @@ import org.apache.hadoop.security.AccessControlException; public interface Context { /** - * Returns configuration object. - */ - Configuration getConf(); - - /** * Returns true if the SPS is running, false otherwise. */ boolean isRunning(); /** - * Update the SPS running status. - * - * @param isSpsRunning - * true represents running, false otherwise - */ - void setSPSRunning(Supplier<Boolean> isSpsRunning); - - /** * Returns true if the Namenode in safe mode, false otherwise. */ boolean isInSafeMode(); @@ -153,17 +137,6 @@ public interface Context { boolean hasLowRedundancyBlocks(long inodeID); /** - * Assign the given block movement task to the target node present in - * {@link BlockMovingInfo}. - * - * @param blkMovingInfo - * block to storage info - * @throws IOException - */ - void assignBlockMoveTaskToTargetNode(BlockMovingInfo blkMovingInfo) - throws IOException; - - /** * Checks whether the given datanode has sufficient space to occupy the given * blockSize data. * @@ -178,4 +151,20 @@ public interface Context { */ boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn, StorageType type, long blockSize); + + /** + * @return next SPS path id to process. + */ + Long getNextSPSPathId(); + + /** + * Removes the SPS path id. + */ + void removeSPSPathId(long pathId); + + /** + * Removes all SPS path ids. + */ + void removeAllSPSPathIds(); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java new file mode 100644 index 0000000..7cf77f0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode.sps; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * An interface for scanning the directory recursively and collect file ids + * under the given directory. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface FileIdCollector { + + /** + * Scans the given inode directory and collects the file ids under that + * directory and adds to the given BlockStorageMovementNeeded. + * + * @param inodeID + * - The directory ID + */ + void scanAndCollectFileIds(Long inodeId) + throws IOException, InterruptedException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java new file mode 100644 index 0000000..1da4af9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode.sps; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; + +/** + * This class handles the internal SPS block movements. This will assign block + * movement tasks to target datanode descriptors. + */ +public class IntraSPSNameNodeBlockMoveTaskHandler + implements BlockMoveTaskHandler { + + private BlockManager blockManager; + private Namesystem namesystem; + + public IntraSPSNameNodeBlockMoveTaskHandler(BlockManager blockManager, + Namesystem namesytem) { + this.blockManager = blockManager; + this.namesystem = namesytem; + } + + @Override + public void submitMoveTask(BlockMovingInfo blkMovingInfo, + BlockMovementListener blockMoveCompletionListener) throws IOException { + namesystem.readLock(); + try { + DatanodeDescriptor dn = blockManager.getDatanodeManager() + .getDatanode(blkMovingInfo.getTarget().getDatanodeUuid()); + if (dn == null) { + throw new IOException("Failed to schedule block movement task:" + + blkMovingInfo + " as target datanode: " + + blkMovingInfo.getTarget() + " doesn't exists"); + } + dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType()); + dn.addBlocksToMoveStorage(blkMovingInfo); + } finally { + namesystem.readUnlock(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java index 6654212..cef26ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java @@ -20,10 +20,8 @@ package org.apache.hadoop.hdfs.server.namenode.sps; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import java.io.IOException; -import java.util.function.Supplier; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.UnresolvedLinkException; @@ -38,7 +36,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; -import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.AccessControlException; import org.slf4j.Logger; @@ -55,15 +52,14 @@ public class IntraSPSNameNodeContext implements Context { private final Namesystem namesystem; private final BlockManager blockManager; - private final Configuration conf; - private Supplier<Boolean> isSpsRunning; + + private SPSService service; public IntraSPSNameNodeContext(Namesystem namesystem, - BlockManager blockManager, Configuration conf) { + BlockManager blockManager, SPSService service) { this.namesystem = namesystem; this.blockManager = blockManager; - this.conf = conf; - isSpsRunning = () -> false; + this.service = service; } @Override @@ -111,11 +107,6 @@ public class IntraSPSNameNodeContext implements Context { } @Override - public Configuration getConf() { - return conf; - } - - @Override public boolean isFileExist(long inodeId) { return namesystem.getFSDirectory().getInode(inodeId) != null; } @@ -127,16 +118,7 @@ public class IntraSPSNameNodeContext implements Context { @Override public boolean isRunning() { - // TODO : 'isSpsRunning' flag has been added to avoid the NN lock inside - // SPS. Context interface will be further refined as part of HDFS-12911 - // modularization task. One idea is to introduce a cleaner interface similar - // to Namesystem for better abstraction. - return namesystem.isRunning() && isSpsRunning.get(); - } - - @Override - public void setSPSRunning(Supplier<Boolean> spsRunningFlag) { - this.isSpsRunning = spsRunningFlag; + return namesystem.isRunning() && service.isRunning(); } @Override @@ -183,25 +165,6 @@ public class IntraSPSNameNodeContext implements Context { } @Override - public void assignBlockMoveTaskToTargetNode(BlockMovingInfo blkMovingInfo) - throws IOException { - namesystem.readLock(); - try { - DatanodeDescriptor dn = blockManager.getDatanodeManager() - .getDatanode(blkMovingInfo.getTarget().getDatanodeUuid()); - if (dn == null) { - throw new IOException("Failed to schedule block movement task:" - + blkMovingInfo + " as target datanode: " - + blkMovingInfo.getTarget() + " doesn't exists"); - } - dn.addBlocksToMoveStorage(blkMovingInfo); - dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType()); - } finally { - namesystem.readUnlock(); - } - } - - @Override public boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn, StorageType type, long blockSize) { namesystem.readLock(); @@ -217,4 +180,19 @@ public class IntraSPSNameNodeContext implements Context { namesystem.readUnlock(); } } + + @Override + public Long getNextSPSPathId() { + return blockManager.getNextSPSPathId(); + } + + @Override + public void removeSPSPathId(long trackId) { + blockManager.removeSPSPathId(trackId); + } + + @Override + public void removeAllSPSPathIds() { + blockManager.removeAllSPSPathIds(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java new file mode 100644 index 0000000..c6834c1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode.sps; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.namenode.FSDirectory; +import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser; +import org.apache.hadoop.hdfs.server.namenode.INode; + +/** + * A specific implementation for scanning the directory with Namenode internal + * Inode structure and collects the file ids under the given directory ID. + */ +public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser + implements FileIdCollector { + private int maxQueueLimitToScan; + private final SPSService service; + + private int remainingCapacity = 0; + + private List<ItemInfo> currentBatch; + + public IntraSPSNameNodeFileIdCollector(FSDirectory dir, SPSService service) { + super(dir); + this.service = service; + this.maxQueueLimitToScan = service.getConf().getInt( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT); + currentBatch = new ArrayList<>(maxQueueLimitToScan); + } + + @Override + protected boolean processFileInode(INode inode, TraverseInfo traverseInfo) + throws IOException, InterruptedException { + if (LOG.isTraceEnabled()) { + LOG.trace("Processing {} for statisy the policy", + inode.getFullPathName()); + } + if (!inode.isFile()) { + return false; + } + if (inode.isFile() && inode.asFile().numBlocks() != 0) { + currentBatch.add(new ItemInfo( + ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId())); + remainingCapacity--; + } + return true; + } + + @Override + protected boolean canSubmitCurrentBatch() { + return remainingCapacity <= 0; + } + + @Override + protected void checkINodeReady(long startId) throws IOException { + // SPS work won't be scheduled if NN is in standby. So, skipping NN + // standby check. + return; + } + + @Override + protected void submitCurrentBatch(long startId) + throws IOException, InterruptedException { + // Add current child's to queue + service.addAllFileIdsToProcess(startId, + currentBatch, false); + currentBatch.clear(); + } + + @Override + protected void throttle() throws InterruptedException { + if (LOG.isDebugEnabled()) { + LOG.debug("StorageMovementNeeded queue remaining capacity is zero," + + " waiting for some free slots."); + } + remainingCapacity = remainingCapacity(); + // wait for queue to be free + while (remainingCapacity <= 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting for storageMovementNeeded queue to be free!"); + } + Thread.sleep(5000); + remainingCapacity = remainingCapacity(); + } + } + + @Override + protected boolean canTraverseDir(INode inode) throws IOException { + return true; + } + + @Override + protected void checkPauseForTesting() throws InterruptedException { + // Nothing to do + } + + @Override + public void scanAndCollectFileIds(final Long startINodeId) + throws IOException, InterruptedException { + FSDirectory fsd = getFSDirectory(); + INode startInode = fsd.getInode(startINodeId); + if (startInode != null) { + remainingCapacity = remainingCapacity(); + if (remainingCapacity == 0) { + throttle(); + } + if (startInode.isFile()) { + currentBatch.add(new ItemInfo(startInode.getId(), startInode.getId())); + } else { + + readLock(); + // NOTE: this lock will not be held until full directory scanning. It is + // basically a sliced locking. Once it collects a batch size( at max the + // size of maxQueueLimitToScan (default 1000)) file ids, then it will + // unlock and submits the current batch to SPSService. Once + // service.processingQueueSize() shows empty slots, then lock will be + // resumed and scan also will be resumed. This logic was re-used from + // EDEK feature. + try { + traverseDir(startInode.asDirectory(), startINodeId, + HdfsFileStatus.EMPTY_NAME, new SPSTraverseInfo(startINodeId)); + } finally { + readUnlock(); + } + } + // Mark startInode traverse is done, this is last-batch + service.addAllFileIdsToProcess(startInode.getId(), currentBatch, true); + currentBatch.clear(); + } + } + + /** + * Returns queue remaining capacity. + */ + public synchronized int remainingCapacity() { + int size = service.processingQueueSize(); + if (size >= maxQueueLimitToScan) { + return 0; + } else { + return (maxQueueLimitToScan - size); + } + } + + class SPSTraverseInfo extends TraverseInfo { + private long startId; + + SPSTraverseInfo(long startId) { + this.startId = startId; + } + + public long getStartId() { + return startId; + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java new file mode 100644 index 0000000..47c64cc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.sps; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * ItemInfo is a file info object for which need to satisfy the policy. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ItemInfo { + private long startId; + private long fileId; + private int retryCount; + + public ItemInfo(long startId, long fileId) { + this.startId = startId; + this.fileId = fileId; + // set 0 when item is getting added first time in queue. + this.retryCount = 0; + } + + public ItemInfo(final long startId, final long fileId, final int retryCount) { + this.startId = startId; + this.fileId = fileId; + this.retryCount = retryCount; + } + + /** + * Return the start inode id of the current track Id. This indicates that SPS + * was invoked on this inode id. + */ + public long getStartId() { + return startId; + } + + /** + * Return the File inode Id for which needs to satisfy the policy. + */ + public long getFileId() { + return fileId; + } + + /** + * Returns true if the tracking path is a directory, false otherwise. + */ + public boolean isDir() { + return (startId != fileId); + } + + /** + * Get the attempted retry count of the block for satisfy the policy. + */ + public int getRetryCount() { + return retryCount; + } + + /** + * Increments the retry count. + */ + public void increRetryCount() { + this.retryCount++; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java new file mode 100644 index 0000000..cd6ad22 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.sps; + +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * A class which holds the SPS invoked path ids. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class SPSPathIds { + + // List of pending dir to satisfy the policy + private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>(); + + /** + * Add the path id to queue. + */ + public synchronized void add(long pathId) { + spsDirsToBeTraveresed.add(pathId); + } + + /** + * Removes the path id. + */ + public synchronized void remove(long pathId) { + spsDirsToBeTraveresed.remove(pathId); + } + + /** + * Clears all path ids. + */ + public synchronized void clear() { + spsDirsToBeTraveresed.clear(); + } + + /** + * @return next path id available in queue. + */ + public synchronized Long pollNext() { + return spsDirsToBeTraveresed.poll(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java new file mode 100644 index 0000000..6d85ea6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.sps; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +/** + * An interface for SPSService, which exposes life cycle and processing APIs. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface SPSService { + + /** + * Initializes the helper services. + * + * @param ctxt + * - context is an helper service to provide communication channel + * between NN and SPS + * @param fileIDCollector + * - a helper service for scanning the files under a given directory + * id + * @param handler + * - a helper service for moving the blocks + */ + void init(Context ctxt, FileIdCollector fileIDCollector, + BlockMoveTaskHandler handler); + + /** + * Starts the SPS service. Make sure to initialize the helper services before + * invoking this method. + * + * @param reconfigStart + * - to indicate whether the SPS startup requested from + * reconfiguration service + */ + void start(boolean reconfigStart); + + /** + * Stops the SPS service gracefully. Timed wait to stop storage policy + * satisfier daemon threads. + */ + void stopGracefully(); + + /** + * Disable the SPS service. + * + * @param forceStop + */ + void disable(boolean forceStop); + + /** + * Check whether StoragePolicySatisfier is running. + * + * @return true if running + */ + boolean isRunning(); + + /** + * Adds the Item information(file id etc) to processing queue. + * + * @param itemInfo + */ + void addFileIdToProcess(ItemInfo itemInfo); + + /** + * Adds all the Item information(file id etc) to processing queue. + * + * @param startId + * - directory/file id, on which SPS was called. + * @param itemInfoList + * - list of item infos + * @param scanCompleted + * - whether the scanning of directory fully done with itemInfoList + */ + void addAllFileIdsToProcess(long startId, List<ItemInfo> itemInfoList, + boolean scanCompleted); + + /** + * @return current processing queue size. + */ + int processingQueueSize(); + + /** + * @return the configuration. + */ + Configuration getConf(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java index b3e6b78..28c1372 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java @@ -29,6 +29,7 @@ import java.util.LinkedList; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; @@ -47,7 +48,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.balancer.Matcher; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; -import org.apache.hadoop.hdfs.server.namenode.FSDirectory; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; @@ -64,28 +64,34 @@ import com.google.common.annotations.VisibleForTesting; * storage policy type in Namespace, but physical block storage movement will * not happen until user runs "Mover Tool" explicitly for such files. The * StoragePolicySatisfier Daemon thread implemented for addressing the case - * where users may want to physically move the blocks by HDFS itself instead of - * running mover tool explicitly. Just calling client API to - * satisfyStoragePolicy on a file/dir will automatically trigger to move its - * physical storage locations as expected in asynchronous manner. Here Namenode - * will pick the file blocks which are expecting to change its storages, then it - * will build the mapping of source block location and expected storage type and - * location to move. After that this class will also prepare commands to send to - * Datanode for processing the physical block movements. + * where users may want to physically move the blocks by a dedidated daemon (can + * run inside Namenode or stand alone) instead of running mover tool explicitly. + * Just calling client API to satisfyStoragePolicy on a file/dir will + * automatically trigger to move its physical storage locations as expected in + * asynchronous manner. Here SPS will pick the file blocks which are expecting + * to change its storages, then it will build the mapping of source block + * location and expected storage type and location to move. After that this + * class will also prepare requests to send to Datanode for processing the + * physical block movements. */ @InterfaceAudience.Private -public class StoragePolicySatisfier implements Runnable { +public class StoragePolicySatisfier implements SPSService, Runnable { public static final Logger LOG = LoggerFactory.getLogger(StoragePolicySatisfier.class); private Daemon storagePolicySatisfierThread; - private final BlockStorageMovementNeeded storageMovementNeeded; - private final BlockStorageMovementAttemptedItems storageMovementsMonitor; + private BlockStorageMovementNeeded storageMovementNeeded; + private BlockStorageMovementAttemptedItems storageMovementsMonitor; private volatile boolean isRunning = false; private int spsWorkMultiplier; private long blockCount = 0L; private int blockMovementMaxRetry; - private final Context ctxt; + private Context ctxt; + private BlockMoveTaskHandler blockMoveTaskHandler; + private Configuration conf; + public StoragePolicySatisfier(Configuration conf) { + this.conf = conf; + } /** * Represents the collective analysis status for all blocks. */ @@ -125,13 +131,17 @@ public class StoragePolicySatisfier implements Runnable { } } - public StoragePolicySatisfier(Context ctxt) { - this.ctxt = ctxt; - this.storageMovementNeeded = new BlockStorageMovementNeeded(ctxt); - this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(ctxt, + public void init(final Context context, final FileIdCollector fileIDCollector, + final BlockMoveTaskHandler blockMovementTaskHandler) { + this.ctxt = context; + this.storageMovementNeeded = + new BlockStorageMovementNeeded(context, fileIDCollector); + this.storageMovementsMonitor = + new BlockStorageMovementAttemptedItems(this, storageMovementNeeded); - this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(ctxt.getConf()); - this.blockMovementMaxRetry = ctxt.getConf().getInt( + this.blockMoveTaskHandler = blockMovementTaskHandler; + this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(getConf()); + this.blockMovementMaxRetry = getConf().getInt( DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY, DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT); } @@ -139,12 +149,10 @@ public class StoragePolicySatisfier implements Runnable { /** * Start storage policy satisfier demon thread. Also start block storage * movements monitor for retry the attempts if needed. - * - * // TODO: FSDirectory will get removed via HDFS-12911 modularization work. */ - public synchronized void start(boolean reconfigStart, FSDirectory fsd) { + @Override + public synchronized void start(boolean reconfigStart) { isRunning = true; - ctxt.setSPSRunning(this::isRunning); if (ctxt.isMoverRunning()) { isRunning = false; LOG.error( @@ -163,20 +171,14 @@ public class StoragePolicySatisfier implements Runnable { // Ensure that all the previously submitted block movements(if any) have to // be stopped in all datanodes. addDropSPSWorkCommandsToAllDNs(); - storageMovementNeeded.init(fsd); storagePolicySatisfierThread = new Daemon(this); storagePolicySatisfierThread.setName("StoragePolicySatisfier"); storagePolicySatisfierThread.start(); this.storageMovementsMonitor.start(); + this.storageMovementNeeded.activate(); } - /** - * Disables storage policy satisfier by stopping its services. - * - * @param forceStop - * true represents that it should stop SPS service by clearing all - * pending SPS work - */ + @Override public synchronized void disable(boolean forceStop) { isRunning = false; if (storagePolicySatisfierThread == null) { @@ -195,14 +197,15 @@ public class StoragePolicySatisfier implements Runnable { } } - /** - * Timed wait to stop storage policy satisfier daemon threads. - */ + @Override public synchronized void stopGracefully() { if (isRunning) { disable(true); } - this.storageMovementsMonitor.stopGracefully(); + + if (this.storageMovementsMonitor != null) { + this.storageMovementsMonitor.stopGracefully(); + } if (storagePolicySatisfierThread == null) { return; @@ -213,10 +216,7 @@ public class StoragePolicySatisfier implements Runnable { } } - /** - * Check whether StoragePolicySatisfier is running. - * @return true if running - */ + @Override public boolean isRunning() { return isRunning; } @@ -239,11 +239,11 @@ public class StoragePolicySatisfier implements Runnable { if(itemInfo.getRetryCount() >= blockMovementMaxRetry){ LOG.info("Failed to satisfy the policy after " + blockMovementMaxRetry + " retries. Removing inode " - + itemInfo.getTrackId() + " from the queue"); + + itemInfo.getFileId() + " from the queue"); storageMovementNeeded.removeItemTrackInfo(itemInfo, false); continue; } - long trackId = itemInfo.getTrackId(); + long trackId = itemInfo.getFileId(); BlocksMovingAnalysis status = null; DatanodeStorageReport[] liveDnReports; BlockStoragePolicy existingStoragePolicy; @@ -273,7 +273,7 @@ public class StoragePolicySatisfier implements Runnable { // be removed on storage movement attempt finished report. case BLOCKS_TARGETS_PAIRED: this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo - .getStartId(), itemInfo.getTrackId(), monotonicNow(), + .getStartId(), itemInfo.getFileId(), monotonicNow(), status.assignedBlocks, itemInfo.getRetryCount())); break; case NO_BLOCKS_TARGETS_PAIRED: @@ -282,7 +282,7 @@ public class StoragePolicySatisfier implements Runnable { + " back to retry queue as none of the blocks" + " found its eligible targets."); } - itemInfo.retryCount++; + itemInfo.increRetryCount(); this.storageMovementNeeded.add(itemInfo); break; case FEW_LOW_REDUNDANCY_BLOCKS: @@ -426,7 +426,8 @@ public class StoragePolicySatisfier implements Runnable { for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { // Check for at least one block storage movement has been chosen try { - ctxt.assignBlockMoveTaskToTargetNode(blkMovingInfo); + blockMoveTaskHandler.submitMoveTask(blkMovingInfo, + storageMovementsMonitor); LOG.debug("BlockMovingInfo: {}", blkMovingInfo); assignedBlockIds.add(blkMovingInfo.getBlock()); blockCount++; @@ -611,7 +612,6 @@ public class StoragePolicySatisfier implements Runnable { expected.remove(chosenTarget.storageType); excludeNodes.add(chosenTarget.dn); - // TODO: We can increment scheduled block count for this node? } else { LOG.warn( "Failed to choose target datanode for the required" @@ -830,11 +830,11 @@ public class StoragePolicySatisfier implements Runnable { return; } storageMovementsMonitor - .addReportedMovedBlocks(moveAttemptFinishedBlks.getBlocks()); + .notifyMovementTriedBlocks(moveAttemptFinishedBlks.getBlocks()); } @VisibleForTesting - BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() { + BlockMovementListener getAttemptedItemsMonitor() { return storageMovementsMonitor; } @@ -863,10 +863,6 @@ public class StoragePolicySatisfier implements Runnable { } } - public void addInodeToPendingDirQueue(long id) { - storageMovementNeeded.addToPendingDirQueue(id); - } - /** * Clear queues for given track id. */ @@ -875,57 +871,6 @@ public class StoragePolicySatisfier implements Runnable { } /** - * ItemInfo is a file info object for which need to satisfy the - * policy. - */ - public static class ItemInfo { - private long startId; - private long trackId; - private int retryCount; - - public ItemInfo(long startId, long trackId) { - this.startId = startId; - this.trackId = trackId; - //set 0 when item is getting added first time in queue. - this.retryCount = 0; - } - - public ItemInfo(long startId, long trackId, int retryCount) { - this.startId = startId; - this.trackId = trackId; - this.retryCount = retryCount; - } - - /** - * Return the start inode id of the current track Id. - */ - public long getStartId() { - return startId; - } - - /** - * Return the File inode Id for which needs to satisfy the policy. - */ - public long getTrackId() { - return trackId; - } - - /** - * Returns true if the tracking path is a directory, false otherwise. - */ - public boolean isDir() { - return (startId != trackId); - } - - /** - * Get the attempted retry count of the block for satisfy the policy. - */ - public int getRetryCount() { - return retryCount; - } - } - - /** * This class contains information of an attempted blocks and its last * attempted or reported time stamp. This is used by * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}. @@ -977,4 +922,30 @@ public class StoragePolicySatisfier implements Runnable { String path) throws IOException { return storageMovementNeeded.getStatus(ctxt.getFileID(path)); } + + @Override + public void addFileIdToProcess(ItemInfo trackInfo) { + storageMovementNeeded.add(trackInfo); + } + + @Override + public void addAllFileIdsToProcess(long startId, List<ItemInfo> itemInfoList, + boolean scanCompleted) { + getStorageMovementQueue().addAll(startId, itemInfoList, scanCompleted); + } + + @Override + public int processingQueueSize() { + return storageMovementNeeded.size(); + } + + @Override + public Configuration getConf() { + return conf; + } + + @VisibleForTesting + public BlockStorageMovementNeeded getStorageMovementQueue() { + return storageMovementNeeded; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0c8e48c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java index f9762a8..3e2c324 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java @@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo; -import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -49,12 +48,14 @@ public class TestBlockStorageMovementAttemptedItems { public void setup() throws Exception { Configuration config = new HdfsConfiguration(); Context ctxt = Mockito.mock(Context.class); - Mockito.when(ctxt.getConf()).thenReturn(config); + SPSService sps = Mockito.mock(StoragePolicySatisfier.class); + Mockito.when(sps.getConf()).thenReturn(config); Mockito.when(ctxt.isRunning()).thenReturn(true); Mockito.when(ctxt.isInSafeMode()).thenReturn(false); Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true); - unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(ctxt); - bsmAttemptedItems = new BlockStorageMovementAttemptedItems(ctxt, + unsatisfiedStorageMovementFiles = + new BlockStorageMovementNeeded(ctxt, null); + bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps, unsatisfiedStorageMovementFiles); } @@ -73,7 +74,7 @@ public class TestBlockStorageMovementAttemptedItems { while (monotonicNow() < (stopTime)) { ItemInfo ele = null; while ((ele = unsatisfiedStorageMovementFiles.get()) != null) { - if (item == ele.getTrackId()) { + if (item == ele.getFileId()) { isItemFound = true; break; } @@ -99,7 +100,7 @@ public class TestBlockStorageMovementAttemptedItems { bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0)); Block[] blockArray = new Block[blocks.size()]; blocks.toArray(blockArray); - bsmAttemptedItems.addReportedMovedBlocks(blockArray); + bsmAttemptedItems.notifyMovementTriedBlocks(blockArray); assertEquals("Failed to receive result!", 1, bsmAttemptedItems.getMovementFinishedBlocksCount()); } @@ -137,7 +138,7 @@ public class TestBlockStorageMovementAttemptedItems { .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); Block[] blksMovementReport = new Block[1]; blksMovementReport[0] = new Block(item); - bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport); + bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport); // start block movement report monitor thread bsmAttemptedItems.start(); @@ -162,7 +163,7 @@ public class TestBlockStorageMovementAttemptedItems { .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); Block[] blksMovementReport = new Block[1]; blksMovementReport[0] = new Block(item); - bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport); + bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport); Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out @@ -190,7 +191,7 @@ public class TestBlockStorageMovementAttemptedItems { .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); Block[] blksMovementReport = new Block[1]; blksMovementReport[0] = new Block(item); - bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport); + bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport); assertFalse( "Should not add in queue again if it is not there in" + " storageMovementAttemptedItems", --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org