HDFS-13381 : [SPS]: Use DFSUtilClient#makePathFromFileId() to prepare satisfier file path. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8eabfc06 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8eabfc06 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8eabfc06 Branch: refs/heads/HDFS-10285 Commit: 8eabfc0698304b691c51512aaa511be21a74d56f Parents: 93a0454 Author: Uma Maheswara Rao G <uma.ganguma...@intel.com> Authored: Mon Jul 2 17:22:00 2018 -0700 Committer: Rakesh Radhakrishnan <rake...@apache.org> Committed: Tue Jul 31 12:11:01 2018 +0530 ---------------------------------------------------------------------- .../NamenodeProtocolServerSideTranslatorPB.java | 2 +- .../NamenodeProtocolTranslatorPB.java | 2 +- .../server/blockmanagement/BlockManager.java | 2 +- .../hdfs/server/namenode/FSNamesystem.java | 11 --- .../hdfs/server/namenode/NameNodeRpcServer.java | 8 +- .../hadoop/hdfs/server/namenode/Namesystem.java | 9 --- .../sps/BlockStorageMovementAttemptedItems.java | 72 +++++++---------- .../sps/BlockStorageMovementNeeded.java | 61 ++++++-------- .../hdfs/server/namenode/sps/Context.java | 45 ++++++++--- .../namenode/sps/DatanodeCacheManager.java | 4 +- .../hdfs/server/namenode/sps/FileCollector.java | 13 +-- .../namenode/sps/IntraSPSNameNodeContext.java | 54 +++++++++---- .../sps/IntraSPSNameNodeFileIdCollector.java | 14 ++-- .../hdfs/server/namenode/sps/ItemInfo.java | 34 ++++---- .../hdfs/server/namenode/sps/SPSService.java | 31 +++---- .../namenode/sps/StoragePolicySatisfier.java | 61 +++++--------- .../sps/StoragePolicySatisfyManager.java | 20 ++--- .../hdfs/server/protocol/NamenodeProtocol.java | 2 +- .../sps/ExternalSPSBlockMoveTaskHandler.java | 4 +- .../hdfs/server/sps/ExternalSPSContext.java | 85 ++++++++++++++++---- .../sps/ExternalSPSFilePathCollector.java | 36 +++++---- .../sps/ExternalStoragePolicySatisfier.java | 30 +------ .../src/main/proto/NamenodeProtocol.proto | 2 +- .../TestBlockStorageMovementAttemptedItems.java | 16 ++-- .../sps/TestStoragePolicySatisfier.java | 66 +++++---------- ...stStoragePolicySatisfierWithStripedFile.java | 41 ++++------ .../sps/TestExternalStoragePolicySatisfier.java | 35 +++----- 27 files changed, 346 insertions(+), 414 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java index e4283c6..d9367fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java @@ -265,7 +265,7 @@ public class NamenodeProtocolServerSideTranslatorPB implements RpcController controller, GetNextSPSPathRequestProto request) throws ServiceException { try { - String nextSPSPath = impl.getNextSPSPath(); + Long nextSPSPath = impl.getNextSPSPath(); if (nextSPSPath == null) { return GetNextSPSPathResponseProto.newBuilder().build(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java index 97dee9b..3bd5986 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java @@ -267,7 +267,7 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol, } @Override - public String getNextSPSPath() throws IOException { + public Long getNextSPSPath() throws IOException { GetNextSPSPathRequestProto req = GetNextSPSPathRequestProto.newBuilder().build(); try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index caf250f..cb0de67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3897,7 +3897,7 @@ public class BlockManager implements BlockStatsMXBean { private void notifyStorageMovementAttemptFinishedBlk( DatanodeStorageInfo storageInfo, Block block) { if (getSPSManager() != null) { - SPSService<Long> sps = getSPSManager().getInternalSPSService(); + SPSService sps = getSPSManager().getInternalSPSService(); if (sps.isRunning()) { sps.notifyStorageMovementAttemptFinishedBlk( storageInfo.getDatanodeDescriptor(), storageInfo.getStorageType(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 9aa7a7e..ddb438d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3202,17 +3202,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return stat; } - @Override - public String getFilePath(Long inodeId) { - readLock(); - try { - INode inode = getFSDirectory().getInode(inodeId); - return inode == null ? null : inode.getFullPathName(); - } finally { - readUnlock(); - } - } - /** * Returns true if the file is closed */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 57e827d..2f3325f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -2569,7 +2569,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { } @Override - public String getNextSPSPath() throws IOException { + public Long getNextSPSPath() throws IOException { checkNNStartup(); String operationName = "getNextSPSPath"; namesystem.checkSuperuserPrivilege(operationName); @@ -2589,10 +2589,6 @@ public class NameNodeRpcServer implements NamenodeProtocols { throw new IOException("SPS service mode is " + spsMode + ", so " + "external SPS service is not allowed to fetch the path Ids"); } - Long pathId = spsMgr.getNextPathId(); - if (pathId == null) { - return null; - } - return namesystem.getFilePath(pathId); + return namesystem.getBlockManager().getSPSManager().getNextPathId(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java index fc933b7..82af4d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java @@ -77,13 +77,4 @@ public interface Namesystem extends RwLock, SafeMode { */ HdfsFileStatus getFileInfo(String filePath, boolean resolveLink, boolean needLocation) throws IOException; - - /** - * Gets the file path corresponds to the given file id. - * - * @param inodeId - * file id - * @return string file path - */ - String getFilePath(Long inodeId); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java index 5b25491..df4f0dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java @@ -52,13 +52,8 @@ import com.google.common.annotations.VisibleForTesting; * entries from tracking. If there is no DN reports about movement attempt * finished for a longer time period, then such items will retries automatically * after timeout. The default timeout would be 5 minutes. - * - * @param <T> - * is identifier of inode or full path name of inode. Internal sps will - * use the file inodeId for the block movement. External sps will use - * file string path representation for the block movement. */ -public class BlockStorageMovementAttemptedItems<T> { +public class BlockStorageMovementAttemptedItems { private static final Logger LOG = LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class); @@ -66,14 +61,14 @@ public class BlockStorageMovementAttemptedItems<T> { * A map holds the items which are already taken for blocks movements * processing and sent to DNs. */ - private final List<AttemptedItemInfo<T>> storageMovementAttemptedItems; + private final List<AttemptedItemInfo> storageMovementAttemptedItems; private Map<Block, Set<StorageTypeNodePair>> scheduledBlkLocs; // Maintains separate Queue to keep the movement finished blocks. This Q // is used to update the storageMovementAttemptedItems list asynchronously. private final BlockingQueue<Block> movementFinishedBlocks; private volatile boolean monitorRunning = true; private Daemon timerThread = null; - private BlockMovementListener blkMovementListener; + private final Context context; // // It might take anywhere between 5 to 10 minutes before // a request is timed out. @@ -85,12 +80,12 @@ public class BlockStorageMovementAttemptedItems<T> { // a request is timed out. // private long minCheckTimeout = 1 * 60 * 1000; // minimum value - private BlockStorageMovementNeeded<T> blockStorageMovementNeeded; - private final SPSService<T> service; + private BlockStorageMovementNeeded blockStorageMovementNeeded; + private final SPSService service; - public BlockStorageMovementAttemptedItems(SPSService<T> service, - BlockStorageMovementNeeded<T> unsatisfiedStorageMovementFiles, - BlockMovementListener blockMovementListener) { + public BlockStorageMovementAttemptedItems(SPSService service, + BlockStorageMovementNeeded unsatisfiedStorageMovementFiles, + Context context) { this.service = service; long recheckTimeout = this.service.getConf().getLong( DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, @@ -106,19 +101,27 @@ public class BlockStorageMovementAttemptedItems<T> { storageMovementAttemptedItems = new ArrayList<>(); scheduledBlkLocs = new HashMap<>(); movementFinishedBlocks = new LinkedBlockingQueue<>(); - this.blkMovementListener = blockMovementListener; + this.context = context; } /** * Add item to block storage movement attempted items map which holds the * tracking/blockCollection id versus time stamp. * - * @param itemInfo - * - tracking info + * @param startPathId + * - start satisfier path identifier + * @param fileId + * - file identifier + * @param monotonicNow + * - time now + * @param assignedBlocks + * - assigned blocks for block movement + * @param retryCount + * - retry count */ - public void add(T startPath, T file, long monotonicNow, + public void add(long startPathId, long fileId, long monotonicNow, Map<Block, Set<StorageTypeNodePair>> assignedBlocks, int retryCount) { - AttemptedItemInfo<T> itemInfo = new AttemptedItemInfo<T>(startPath, file, + AttemptedItemInfo itemInfo = new AttemptedItemInfo(startPathId, fileId, monotonicNow, assignedBlocks.keySet(), retryCount); synchronized (storageMovementAttemptedItems) { storageMovementAttemptedItems.add(itemInfo); @@ -161,11 +164,9 @@ public class BlockStorageMovementAttemptedItems<T> { boolean foundType = dn.getStorageType().equals(type); if (foundDn && foundType) { blkLocs.remove(dn); - // listener if it is plugged-in - if (blkMovementListener != null) { - blkMovementListener - .notifyMovementTriedBlocks(new Block[] {reportedBlock}); - } + Block[] mFinishedBlocks = new Block[1]; + mFinishedBlocks[0] = reportedBlock; + context.notifyMovementTriedBlocks(mFinishedBlocks); // All the block locations has reported. if (blkLocs.size() <= 0) { movementFinishedBlocks.add(reportedBlock); @@ -244,15 +245,15 @@ public class BlockStorageMovementAttemptedItems<T> { @VisibleForTesting void blocksStorageMovementUnReportedItemsCheck() { synchronized (storageMovementAttemptedItems) { - Iterator<AttemptedItemInfo<T>> iter = storageMovementAttemptedItems + Iterator<AttemptedItemInfo> iter = storageMovementAttemptedItems .iterator(); long now = monotonicNow(); while (iter.hasNext()) { - AttemptedItemInfo<T> itemInfo = iter.next(); + AttemptedItemInfo itemInfo = iter.next(); if (now > itemInfo.getLastAttemptedOrReportedTime() + selfRetryTimeout) { - T file = itemInfo.getFile(); - ItemInfo<T> candidate = new ItemInfo<T>(itemInfo.getStartPath(), file, + long file = itemInfo.getFile(); + ItemInfo candidate = new ItemInfo(itemInfo.getStartPath(), file, itemInfo.getRetryCount() + 1); blockStorageMovementNeeded.add(candidate); iter.remove(); @@ -272,13 +273,13 @@ public class BlockStorageMovementAttemptedItems<T> { // Update attempted items list for (Block blk : finishedBlks) { synchronized (storageMovementAttemptedItems) { - Iterator<AttemptedItemInfo<T>> iterator = storageMovementAttemptedItems + Iterator<AttemptedItemInfo> iterator = storageMovementAttemptedItems .iterator(); while (iterator.hasNext()) { - AttemptedItemInfo<T> attemptedItemInfo = iterator.next(); + AttemptedItemInfo attemptedItemInfo = iterator.next(); attemptedItemInfo.getBlocks().remove(blk); if (attemptedItemInfo.getBlocks().isEmpty()) { - blockStorageMovementNeeded.add(new ItemInfo<T>( + blockStorageMovementNeeded.add(new ItemInfo( attemptedItemInfo.getStartPath(), attemptedItemInfo.getFile(), attemptedItemInfo.getRetryCount() + 1)); iterator.remove(); @@ -309,15 +310,4 @@ public class BlockStorageMovementAttemptedItems<T> { scheduledBlkLocs.clear(); } } - - /** - * Sets external listener for testing. - * - * @param blkMoveListener - * block movement listener callback object - */ - @VisibleForTesting - void setBlockMovementListener(BlockMovementListener blkMoveListener) { - this.blkMovementListener = blkMoveListener; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java index a194876..c95dcda 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java @@ -43,47 +43,38 @@ import com.google.common.annotations.VisibleForTesting; * schedule the block collection IDs for movement. It track the info of * scheduled items and remove the SPS xAttr from the file/Directory once * movement is success. - * - * @param <T> - * is identifier of inode or full path name of inode. Internal sps will - * use the file inodeId for the block movement. External sps will use - * file string path representation for the block movement. */ @InterfaceAudience.Private -public class BlockStorageMovementNeeded<T> { +public class BlockStorageMovementNeeded { public static final Logger LOG = LoggerFactory.getLogger(BlockStorageMovementNeeded.class); - private final Queue<ItemInfo<T>> storageMovementNeeded = - new LinkedList<ItemInfo<T>>(); + private final Queue<ItemInfo> storageMovementNeeded = + new LinkedList<ItemInfo>(); /** * Map of startPath and number of child's. Number of child's indicate the * number of files pending to satisfy the policy. */ - private final Map<T, DirPendingWorkInfo> pendingWorkForDirectory = + private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory = new HashMap<>(); - private final Map<T, StoragePolicySatisfyPathStatusInfo> spsStatus = + private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus = new ConcurrentHashMap<>(); - private final Context<T> ctxt; + private final Context ctxt; private Daemon pathIdCollector; - private FileCollector<T> fileCollector; - private SPSPathIdProcessor pathIDProcessor; // Amount of time to cache the SUCCESS status of path before turning it to // NOT_AVAILABLE. private static long statusClearanceElapsedTimeMs = 300000; - public BlockStorageMovementNeeded(Context<T> context, - FileCollector<T> fileCollector) { + public BlockStorageMovementNeeded(Context context) { this.ctxt = context; - this.fileCollector = fileCollector; pathIDProcessor = new SPSPathIdProcessor(); } @@ -94,7 +85,7 @@ public class BlockStorageMovementNeeded<T> { * @param trackInfo * - track info for satisfy the policy */ - public synchronized void add(ItemInfo<T> trackInfo) { + public synchronized void add(ItemInfo trackInfo) { spsStatus.put(trackInfo.getFile(), new StoragePolicySatisfyPathStatusInfo( StoragePolicySatisfyPathStatus.IN_PROGRESS)); @@ -114,7 +105,7 @@ public class BlockStorageMovementNeeded<T> { * scan. */ @VisibleForTesting - public synchronized void addAll(T startPath, List<ItemInfo<T>> itemInfoList, + public synchronized void addAll(long startPath, List<ItemInfo> itemInfoList, boolean scanCompleted) { storageMovementNeeded.addAll(itemInfoList); updatePendingDirScanStats(startPath, itemInfoList.size(), scanCompleted); @@ -131,7 +122,7 @@ public class BlockStorageMovementNeeded<T> { * elements to scan. */ @VisibleForTesting - public synchronized void add(ItemInfo<T> itemInfo, boolean scanCompleted) { + public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) { storageMovementNeeded.add(itemInfo); // This represents sps start id is file, so no need to update pending dir // stats. @@ -141,7 +132,7 @@ public class BlockStorageMovementNeeded<T> { updatePendingDirScanStats(itemInfo.getFile(), 1, scanCompleted); } - private void updatePendingDirScanStats(T startPath, int numScannedFiles, + private void updatePendingDirScanStats(long startPath, int numScannedFiles, boolean scanCompleted) { DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startPath); if (pendingWork == null) { @@ -160,7 +151,7 @@ public class BlockStorageMovementNeeded<T> { * * @return satisfier files */ - public synchronized ItemInfo<T> get() { + public synchronized ItemInfo get() { return storageMovementNeeded.poll(); } @@ -181,12 +172,12 @@ public class BlockStorageMovementNeeded<T> { * Decrease the pending child count for directory once one file blocks moved * successfully. Remove the SPS xAttr if pending child count is zero. */ - public synchronized void removeItemTrackInfo(ItemInfo<T> trackInfo, + public synchronized void removeItemTrackInfo(ItemInfo trackInfo, boolean isSuccess) throws IOException { if (trackInfo.isDir()) { // If track is part of some start inode then reduce the pending // directory work count. - T startId = trackInfo.getStartPath(); + long startId = trackInfo.getStartPath(); if (!ctxt.isFileExist(startId)) { // directory deleted just remove it. this.pendingWorkForDirectory.remove(startId); @@ -212,11 +203,11 @@ public class BlockStorageMovementNeeded<T> { } } - public synchronized void clearQueue(T trackId) { + public synchronized void clearQueue(long trackId) { ctxt.removeSPSPathId(trackId); - Iterator<ItemInfo<T>> iterator = storageMovementNeeded.iterator(); + Iterator<ItemInfo> iterator = storageMovementNeeded.iterator(); while (iterator.hasNext()) { - ItemInfo<T> next = iterator.next(); + ItemInfo next = iterator.next(); if (next.getFile() == trackId) { iterator.remove(); } @@ -227,7 +218,7 @@ public class BlockStorageMovementNeeded<T> { /** * Mark inode status as SUCCESS in map. */ - private void updateStatus(T startId, boolean isSuccess){ + private void updateStatus(long startId, boolean isSuccess){ StoragePolicySatisfyPathStatusInfo spsStatusInfo = spsStatus.get(startId); if (spsStatusInfo == null) { @@ -249,7 +240,7 @@ public class BlockStorageMovementNeeded<T> { */ public synchronized void clearQueuesWithNotification() { // Remove xAttr from directories - T trackId; + Long trackId; while ((trackId = ctxt.getNextSPSPath()) != null) { try { // Remove xAttr for file @@ -261,7 +252,7 @@ public class BlockStorageMovementNeeded<T> { // File's directly added to storageMovementNeeded, So try to remove // xAttr for file - ItemInfo<T> itemInfo; + ItemInfo itemInfo; while ((itemInfo = get()) != null) { try { // Remove xAttr for file @@ -287,7 +278,7 @@ public class BlockStorageMovementNeeded<T> { public void run() { LOG.info("Starting SPSPathIdProcessor!."); long lastStatusCleanTime = 0; - T startINode = null; + Long startINode = null; while (ctxt.isRunning()) { try { if (!ctxt.isInSafeMode()) { @@ -301,7 +292,7 @@ public class BlockStorageMovementNeeded<T> { spsStatus.put(startINode, new StoragePolicySatisfyPathStatusInfo( StoragePolicySatisfyPathStatus.IN_PROGRESS)); - fileCollector.scanAndCollectFiles(startINode); + ctxt.scanAndCollectFiles(startINode); // check if directory was empty and no child added to queue DirPendingWorkInfo dirPendingWorkInfo = pendingWorkForDirectory.get(startINode); @@ -339,9 +330,9 @@ public class BlockStorageMovementNeeded<T> { } private synchronized void cleanSPSStatus() { - for (Iterator<Entry<T, StoragePolicySatisfyPathStatusInfo>> it = spsStatus - .entrySet().iterator(); it.hasNext();) { - Entry<T, StoragePolicySatisfyPathStatusInfo> entry = it.next(); + for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it = + spsStatus.entrySet().iterator(); it.hasNext();) { + Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next(); if (entry.getValue().canRemove()) { it.remove(); } @@ -477,7 +468,7 @@ public class BlockStorageMovementNeeded<T> { return statusClearanceElapsedTimeMs; } - public void markScanCompletedForDir(T inode) { + public void markScanCompletedForDir(long inode) { DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inode); if (pendingWork != null) { pendingWork.markScanCompleted(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java index 55a1f7a..d538374 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java @@ -24,24 +24,21 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.AccessControlException; /** * An interface for the communication between SPS and Namenode module. - * - * @param <T> - * is identifier of inode or full path name of inode. Internal sps will - * use the file inodeId for the block movement. External sps will use - * file string path representation for the block movement. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public interface Context<T> { +public interface Context { /** * Returns true if the SPS is running, false otherwise. @@ -85,7 +82,7 @@ public interface Context<T> { * - file info * @return true if the given file exists, false otherwise. */ - boolean isFileExist(T filePath); + boolean isFileExist(long filePath); /** * Gets the storage policy details for the given policy ID. @@ -108,7 +105,7 @@ public interface Context<T> { * - user invoked satisfier path * @throws IOException */ - void removeSPSHint(T spsPath) throws IOException; + void removeSPSHint(long spsPath) throws IOException; /** * Gets the number of live datanodes in the cluster. @@ -124,7 +121,7 @@ public interface Context<T> { * file path * @return file status metadata information */ - HdfsFileStatus getFileInfo(T file) throws IOException; + HdfsFileStatus getFileInfo(long file) throws IOException; /** * Returns all the live datanodes and its storage details. @@ -137,15 +134,41 @@ public interface Context<T> { /** * @return next SPS path info to process. */ - T getNextSPSPath(); + Long getNextSPSPath(); /** * Removes the SPS path id. */ - void removeSPSPathId(T pathId); + void removeSPSPathId(long pathId); /** * Removes all SPS path ids. */ void removeAllSPSPathIds(); + + /** + * Do scan and collects the files under that directory and adds to the given + * BlockStorageMovementNeeded. + * + * @param filePath + * file path + */ + void scanAndCollectFiles(long filePath) + throws IOException, InterruptedException; + + /** + * Handles the block move tasks. BlockMovingInfo must contain the required + * info to move the block, that source location, destination location and + * storage types. + */ + void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException; + + /** + * This can be used to notify to the SPS about block movement attempt + * finished. Then SPS will re-check whether it needs retry or not. + * + * @param moveAttemptFinishedBlks + * list of movement attempt finished blocks + */ + void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java index 3531ecd..d4e514b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java @@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory; * interval. */ @InterfaceAudience.Private -public class DatanodeCacheManager<T> { +public class DatanodeCacheManager { private static final Logger LOG = LoggerFactory .getLogger(DatanodeCacheManager.class); @@ -78,7 +78,7 @@ public class DatanodeCacheManager<T> { * @throws IOException */ public DatanodeMap getLiveDatanodeStorageReport( - Context<T> spsContext) throws IOException { + Context spsContext) throws IOException { long now = Time.monotonicNow(); long elapsedTimeMs = now - lastAccessedTime; boolean refreshNeeded = elapsedTimeMs >= refreshIntervalMs; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java index dceb5fa..fa8b31b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java @@ -26,23 +26,18 @@ import org.apache.hadoop.classification.InterfaceStability; /** * An interface for scanning the directory recursively and collect files * under the given directory. - * - * @param <T> - * is identifier of inode or full path name of inode. Internal sps will - * use the file inodeId for the block movement. External sps will use - * file string path representation for the block movement. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public interface FileCollector<T> { +public interface FileCollector { /** * This method can be used to scan and collects the files under that * directory and adds to the given BlockStorageMovementNeeded. * - * @param filePath - * - file path + * @param path + * - file path id */ - void scanAndCollectFiles(T filePath) + void scanAndCollectFiles(long path) throws IOException, InterruptedException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java index a77fe85..2bf4810 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java @@ -20,11 +20,14 @@ package org.apache.hadoop.hdfs.server.namenode.sps; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import java.io.IOException; +import java.util.Arrays; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -32,6 +35,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap; import org.apache.hadoop.net.NetworkTopology; @@ -45,20 +49,26 @@ import org.slf4j.LoggerFactory; * movements to satisfy the storage policy. */ @InterfaceAudience.Private -public class IntraSPSNameNodeContext implements Context<Long> { +public class IntraSPSNameNodeContext implements Context { private static final Logger LOG = LoggerFactory .getLogger(IntraSPSNameNodeContext.class); private final Namesystem namesystem; private final BlockManager blockManager; - private SPSService<Long> service; + private SPSService service; + private final FileCollector fileCollector; + private final BlockMoveTaskHandler blockMoveTaskHandler; public IntraSPSNameNodeContext(Namesystem namesystem, - BlockManager blockManager, SPSService<Long> service) { + BlockManager blockManager, SPSService service) { this.namesystem = namesystem; this.blockManager = blockManager; this.service = service; + fileCollector = new IntraSPSNameNodeFileIdCollector( + namesystem.getFSDirectory(), service); + blockMoveTaskHandler = new IntraSPSNameNodeBlockMoveTaskHandler( + blockManager, namesystem); } @Override @@ -67,17 +77,12 @@ public class IntraSPSNameNodeContext implements Context<Long> { } /** - * @return object containing information regarding the file or null if file - * not found. + * @return object containing information regarding the file. */ @Override - public HdfsFileStatus getFileInfo(Long inodeID) throws IOException { - String filePath = namesystem.getFilePath(inodeID); - if (StringUtils.isBlank(filePath)) { - LOG.debug("File with inodeID:{} doesn't exists!", inodeID); - return null; - } - return namesystem.getFileInfo(filePath, true, true); + public HdfsFileStatus getFileInfo(long inodeID) throws IOException { + Path filePath = DFSUtilClient.makePathFromFileId(inodeID); + return namesystem.getFileInfo(filePath.toString(), true, true); } @Override @@ -93,12 +98,12 @@ public class IntraSPSNameNodeContext implements Context<Long> { } @Override - public boolean isFileExist(Long inodeId) { + public boolean isFileExist(long inodeId) { return namesystem.getFSDirectory().getInode(inodeId) != null; } @Override - public void removeSPSHint(Long inodeId) throws IOException { + public void removeSPSHint(long inodeId) throws IOException { this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY); } @@ -156,7 +161,7 @@ public class IntraSPSNameNodeContext implements Context<Long> { } @Override - public void removeSPSPathId(Long trackId) { + public void removeSPSPathId(long trackId) { blockManager.getSPSManager().removePathId(trackId); } @@ -164,4 +169,21 @@ public class IntraSPSNameNodeContext implements Context<Long> { public void removeAllSPSPathIds() { blockManager.getSPSManager().removeAllPathIds(); } + + @Override + public void scanAndCollectFiles(long filePath) + throws IOException, InterruptedException { + fileCollector.scanAndCollectFiles(filePath); + } + + @Override + public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException { + blockMoveTaskHandler.submitMoveTask(blkMovingInfo); + } + + @Override + public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) { + LOG.info("Movement attempted blocks: {}", + Arrays.asList(moveAttemptFinishedBlks)); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java index 27d9e7d..ea3b96f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java @@ -35,16 +35,16 @@ import org.apache.hadoop.hdfs.server.namenode.INode; */ @InterfaceAudience.Private public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser - implements FileCollector<Long> { + implements FileCollector { private int maxQueueLimitToScan; - private final SPSService <Long> service; + private final SPSService service; private int remainingCapacity = 0; - private List<ItemInfo<Long>> currentBatch; + private List<ItemInfo> currentBatch; public IntraSPSNameNodeFileIdCollector(FSDirectory dir, - SPSService<Long> service) { + SPSService service) { super(dir); this.service = service; this.maxQueueLimitToScan = service.getConf().getInt( @@ -64,7 +64,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser return false; } if (inode.isFile() && inode.asFile().numBlocks() != 0) { - currentBatch.add(new ItemInfo<Long>( + currentBatch.add(new ItemInfo( ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId())); remainingCapacity--; } @@ -120,7 +120,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser } @Override - public void scanAndCollectFiles(final Long startINodeId) + public void scanAndCollectFiles(final long startINodeId) throws IOException, InterruptedException { FSDirectory fsd = getFSDirectory(); INode startInode = fsd.getInode(startINodeId); @@ -131,7 +131,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser } if (startInode.isFile()) { currentBatch - .add(new ItemInfo<Long>(startInode.getId(), startInode.getId())); + .add(new ItemInfo(startInode.getId(), startInode.getId())); } else { readLock(); // NOTE: this lock will not be held for full directory scanning. It is http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java index bd8ab92..949e3fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java @@ -21,28 +21,26 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; /** - * ItemInfo is a file info object for which need to satisfy the policy. For - * internal satisfier service, it uses inode id which is Long datatype. For the - * external satisfier service, it uses the full string representation of the - * path. + * ItemInfo is a file info object for which need to satisfy the policy. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class ItemInfo<T> { - private T startPath; - private T file; +public class ItemInfo { + private long startPathId; + private long fileId; private int retryCount; - public ItemInfo(T startPath, T file) { - this.startPath = startPath; - this.file = file; + public ItemInfo(long startPathId, long fileId) { + this.startPathId = startPathId; + this.fileId = fileId; // set 0 when item is getting added first time in queue. this.retryCount = 0; } - public ItemInfo(final T startPath, final T file, final int retryCount) { - this.startPath = startPath; - this.file = file; + public ItemInfo(final long startPathId, final long fileId, + final int retryCount) { + this.startPathId = startPathId; + this.fileId = fileId; this.retryCount = retryCount; } @@ -50,22 +48,22 @@ public class ItemInfo<T> { * Returns the start path of the current file. This indicates that SPS * was invoked on this path. */ - public T getStartPath() { - return startPath; + public long getStartPath() { + return startPathId; } /** * Returns the file for which needs to satisfy the policy. */ - public T getFile() { - return file; + public long getFile() { + return fileId; } /** * Returns true if the tracking path is a directory, false otherwise. */ public boolean isDir() { - return !startPath.equals(file); + return !(startPathId == fileId); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java index 5032377..86634d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java @@ -29,15 +29,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; /** * An interface for SPSService, which exposes life cycle and processing APIs. - * - * @param <T> - * is identifier of inode or full path name of inode. Internal sps will - * use the file inodeId for the block movement. External sps will use - * file string path representation for the block movement. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public interface SPSService<T> { +public interface SPSService { /** * Initializes the helper services. @@ -45,16 +40,8 @@ public interface SPSService<T> { * @param ctxt * - context is an helper service to provide communication channel * between NN and SPS - * @param fileCollector - * - a helper service for scanning the files under a given directory - * id - * @param handler - * - a helper service for moving the blocks - * @param blkMovementListener - * - listener to know about block movement attempt completion */ - void init(Context<T> ctxt, FileCollector<T> fileCollector, - BlockMoveTaskHandler handler, BlockMovementListener blkMovementListener); + void init(Context ctxt); /** * Starts the SPS service. Make sure to initialize the helper services before @@ -94,19 +81,19 @@ public interface SPSService<T> { * @param itemInfo * file info object for which need to satisfy the policy */ - void addFileToProcess(ItemInfo<T> itemInfo, boolean scanCompleted); + void addFileToProcess(ItemInfo itemInfo, boolean scanCompleted); /** * Adds all the Item information(file etc) to processing queue. * - * @param startPath - * - directory/file, on which SPS was called. + * @param startPathId + * - directoryId/fileId, on which SPS was called. * @param itemInfoList * - list of item infos * @param scanCompleted * - whether the scanning of directory fully done with itemInfoList */ - void addAllFilesToProcess(T startPath, List<ItemInfo<T>> itemInfoList, + void addAllFilesToProcess(long startPathId, List<ItemInfo> itemInfoList, boolean scanCompleted); /** @@ -117,7 +104,7 @@ public interface SPSService<T> { /** * Clear inodeId present in the processing queue. */ - void clearQueue(T spsPath); + void clearQueue(long spsPath); /** * @return the configuration. @@ -128,9 +115,9 @@ public interface SPSService<T> { * Marks the scanning of directory if finished. * * @param spsPath - * - satisfier path + * - satisfier path id */ - void markScanCompletedForPath(T spsPath); + void markScanCompletedForPath(long spsPath); /** * Given node is reporting that it received a certain movement attempt http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java index cbd6001..4af6c8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java @@ -78,20 +78,19 @@ import com.google.common.base.Preconditions; * physical block movements. */ @InterfaceAudience.Private -public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { +public class StoragePolicySatisfier implements SPSService, Runnable { public static final Logger LOG = LoggerFactory.getLogger(StoragePolicySatisfier.class); private Daemon storagePolicySatisfierThread; - private BlockStorageMovementNeeded<T> storageMovementNeeded; - private BlockStorageMovementAttemptedItems<T> storageMovementsMonitor; + private BlockStorageMovementNeeded storageMovementNeeded; + private BlockStorageMovementAttemptedItems storageMovementsMonitor; private volatile boolean isRunning = false; private int spsWorkMultiplier; private long blockCount = 0L; private int blockMovementMaxRetry; - private Context<T> ctxt; - private BlockMoveTaskHandler blockMoveTaskHandler; + private Context ctxt; private final Configuration conf; - private DatanodeCacheManager<T> dnCacheMgr; + private DatanodeCacheManager dnCacheMgr; public StoragePolicySatisfier(Configuration conf) { this.conf = conf; @@ -137,16 +136,11 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { } } - public void init(final Context<T> context, - final FileCollector<T> fileIDCollector, - final BlockMoveTaskHandler blockMovementTaskHandler, - final BlockMovementListener blockMovementListener) { + public void init(final Context context) { this.ctxt = context; - this.storageMovementNeeded = new BlockStorageMovementNeeded<T>(context, - fileIDCollector); - this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems<T>( - this, storageMovementNeeded, blockMovementListener); - this.blockMoveTaskHandler = blockMovementTaskHandler; + this.storageMovementNeeded = new BlockStorageMovementNeeded(context); + this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems( + this, storageMovementNeeded, context); this.spsWorkMultiplier = getSPSWorkMultiplier(getConf()); this.blockMovementMaxRetry = getConf().getInt( DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY, @@ -191,7 +185,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { storagePolicySatisfierThread.start(); this.storageMovementsMonitor.start(); this.storageMovementNeeded.activate(); - dnCacheMgr = new DatanodeCacheManager<T>(conf); + dnCacheMgr = new DatanodeCacheManager(conf); } @Override @@ -259,7 +253,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { continue; } try { - ItemInfo<T> itemInfo = null; + ItemInfo itemInfo = null; boolean retryItem = false; if (!ctxt.isInSafeMode()) { itemInfo = storageMovementNeeded.get(); @@ -271,7 +265,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { storageMovementNeeded.removeItemTrackInfo(itemInfo, false); continue; } - T trackId = itemInfo.getFile(); + long trackId = itemInfo.getFile(); BlocksMovingAnalysis status = null; BlockStoragePolicy existingStoragePolicy; // TODO: presently, context internally acquire the lock @@ -353,7 +347,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { blockCount = 0L; } if (retryItem) { - itemInfo.increRetryCount(); + // itemInfo.increRetryCount(); this.storageMovementNeeded.add(itemInfo); } } catch (IOException e) { @@ -469,7 +463,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { // Check for at least one block storage movement has been chosen try { - blockMoveTaskHandler.submitMoveTask(blkMovingInfo); + ctxt.submitMoveTask(blkMovingInfo); LOG.debug("BlockMovingInfo: {}", blkMovingInfo); StorageTypeNodePair nodeStorage = new StorageTypeNodePair( blkMovingInfo.getTargetStorageType(), blkMovingInfo.getTarget()); @@ -1092,7 +1086,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { } @VisibleForTesting - public BlockStorageMovementAttemptedItems<T> getAttemptedItemsMonitor() { + public BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() { return storageMovementsMonitor; } @@ -1109,7 +1103,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { /** * Clear queues for given track id. */ - public void clearQueue(T trackId) { + public void clearQueue(long trackId) { storageMovementNeeded.clearQueue(trackId); } @@ -1118,7 +1112,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { * attempted or reported time stamp. This is used by * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}. */ - final static class AttemptedItemInfo<T> extends ItemInfo<T> { + final static class AttemptedItemInfo extends ItemInfo { private long lastAttemptedOrReportedTime; private final Set<Block> blocks; @@ -1136,7 +1130,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { * @param retryCount * file retry count */ - AttemptedItemInfo(T rootId, T trackId, + AttemptedItemInfo(long rootId, long trackId, long lastAttemptedOrReportedTime, Set<Block> blocks, int retryCount) { super(rootId, trackId, retryCount); @@ -1179,7 +1173,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { } @Override - public void addFileToProcess(ItemInfo<T> trackInfo, boolean scanCompleted) { + public void addFileToProcess(ItemInfo trackInfo, boolean scanCompleted) { storageMovementNeeded.add(trackInfo, scanCompleted); if (LOG.isDebugEnabled()) { LOG.debug("Added track info for inode {} to block " @@ -1188,7 +1182,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { } @Override - public void addAllFilesToProcess(T startPath, List<ItemInfo<T>> itemInfoList, + public void addAllFilesToProcess(long startPath, List<ItemInfo> itemInfoList, boolean scanCompleted) { getStorageMovementQueue().addAll(startPath, itemInfoList, scanCompleted); } @@ -1204,12 +1198,12 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { } @VisibleForTesting - public BlockStorageMovementNeeded<T> getStorageMovementQueue() { + public BlockStorageMovementNeeded getStorageMovementQueue() { return storageMovementNeeded; } @Override - public void markScanCompletedForPath(T inodeId) { + public void markScanCompletedForPath(long inodeId) { getStorageMovementQueue().markScanCompletedForDir(inodeId); } @@ -1278,15 +1272,4 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { "It should be a positive, non-zero integer value."); return spsWorkMultiplier; } - - /** - * Sets external listener for testing. - * - * @param blkMovementListener - * block movement listener callback object - */ - @VisibleForTesting - void setBlockMovementListener(BlockMovementListener blkMovementListener) { - storageMovementsMonitor.setBlockMovementListener(blkMovementListener); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java index 5ec0372..0507d6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java @@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory; public class StoragePolicySatisfyManager { private static final Logger LOG = LoggerFactory .getLogger(StoragePolicySatisfyManager.class); - private final StoragePolicySatisfier<Long> spsService; + private final StoragePolicySatisfier spsService; private final boolean storagePolicyEnabled; private volatile StoragePolicySatisfierMode mode; private final Queue<Long> pathsToBeTraveresed; @@ -84,7 +84,7 @@ public class StoragePolicySatisfyManager { pathsToBeTraveresed = new LinkedList<Long>(); // instantiate SPS service by just keeps config reference and not starting // any supporting threads. - spsService = new StoragePolicySatisfier<Long>(conf); + spsService = new StoragePolicySatisfier(conf); this.namesystem = namesystem; this.blkMgr = blkMgr; } @@ -121,10 +121,7 @@ public class StoragePolicySatisfyManager { } // starts internal daemon service inside namenode spsService.init( - new IntraSPSNameNodeContext(namesystem, blkMgr, spsService), - new IntraSPSNameNodeFileIdCollector(namesystem.getFSDirectory(), - spsService), - new IntraSPSNameNodeBlockMoveTaskHandler(blkMgr, namesystem), null); + new IntraSPSNameNodeContext(namesystem, blkMgr, spsService)); spsService.start(false, mode); break; case EXTERNAL: @@ -221,13 +218,8 @@ public class StoragePolicySatisfyManager { mode); return; } - spsService.init( - new IntraSPSNameNodeContext(this.namesystem, this.blkMgr, spsService), - new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(), - spsService), - new IntraSPSNameNodeBlockMoveTaskHandler(this.blkMgr, - this.namesystem), - null); + spsService.init(new IntraSPSNameNodeContext(this.namesystem, this.blkMgr, + spsService)); spsService.start(true, newMode); break; case EXTERNAL: @@ -309,7 +301,7 @@ public class StoragePolicySatisfyManager { /** * @return internal SPS service instance. */ - public SPSService<Long> getInternalSPSService() { + public SPSService getInternalSPSService() { return this.spsService; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java index 5ff6ffd..f80477b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java @@ -209,6 +209,6 @@ public interface NamenodeProtocol { * by External SPS. */ @AtMostOnce - String getNextSPSPath() throws IOException; + Long getNextSPSPath() throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java index f5225d2..3ea0294 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java @@ -76,11 +76,11 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { private final SaslDataTransferClient saslClient; private final BlockStorageMovementTracker blkMovementTracker; private Daemon movementTrackerThread; - private final SPSService<String> service; + private final SPSService service; private final BlockDispatcher blkDispatcher; public ExternalSPSBlockMoveTaskHandler(Configuration conf, - NameNodeConnector nnc, SPSService<String> spsService) { + NameNodeConnector nnc, SPSService spsService) { int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); moveExecutor = initializeBlockMoverThreadPool(moverThreads); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java index 1cd4664..189bc2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.sps; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; @@ -27,6 +28,8 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -34,10 +37,14 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler; +import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener; import org.apache.hadoop.hdfs.server.namenode.sps.Context; +import org.apache.hadoop.hdfs.server.namenode.sps.FileCollector; import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeWithStorage; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.AccessControlException; @@ -49,17 +56,24 @@ import org.slf4j.LoggerFactory; * SPS from Namenode state. */ @InterfaceAudience.Private -public class ExternalSPSContext implements Context<String> { - public static final Logger LOG = - LoggerFactory.getLogger(ExternalSPSContext.class); - private SPSService<String> service; - private NameNodeConnector nnc = null; - private BlockStoragePolicySuite createDefaultSuite = +public class ExternalSPSContext implements Context { + public static final Logger LOG = LoggerFactory + .getLogger(ExternalSPSContext.class); + private final SPSService service; + private final NameNodeConnector nnc; + private final BlockStoragePolicySuite createDefaultSuite = BlockStoragePolicySuite.createDefaultSuite(); + private final FileCollector fileCollector; + private final BlockMoveTaskHandler externalHandler; + private final BlockMovementListener blkMovementListener; - public ExternalSPSContext(SPSService<String> service, NameNodeConnector nnc) { + public ExternalSPSContext(SPSService service, NameNodeConnector nnc) { this.service = service; this.nnc = nnc; + this.fileCollector = new ExternalSPSFilePathCollector(service); + this.externalHandler = new ExternalSPSBlockMoveTaskHandler( + service.getConf(), nnc, service); + this.blkMovementListener = new ExternalBlockMovementListener(); } @Override @@ -119,9 +133,10 @@ public class ExternalSPSContext implements Context<String> { } @Override - public boolean isFileExist(String filePath) { + public boolean isFileExist(long path) { + Path filePath = DFSUtilClient.makePathFromFileId(path); try { - return nnc.getDistributedFileSystem().exists(new Path(filePath)); + return nnc.getDistributedFileSystem().exists(filePath); } catch (IllegalArgumentException | IOException e) { LOG.warn("Exception while getting file is for the given path:{}", filePath, e); @@ -140,8 +155,9 @@ public class ExternalSPSContext implements Context<String> { } @Override - public void removeSPSHint(String inodeId) throws IOException { - nnc.getDistributedFileSystem().removeXAttr(new Path(inodeId), + public void removeSPSHint(long inodeId) throws IOException { + Path filePath = DFSUtilClient.makePathFromFileId(inodeId); + nnc.getDistributedFileSystem().removeXAttr(filePath, HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY); } @@ -157,11 +173,12 @@ public class ExternalSPSContext implements Context<String> { } @Override - public HdfsFileStatus getFileInfo(String path) throws IOException { + public HdfsFileStatus getFileInfo(long path) throws IOException { HdfsLocatedFileStatus fileInfo = null; try { + Path filePath = DFSUtilClient.makePathFromFileId(path); fileInfo = nnc.getDistributedFileSystem().getClient() - .getLocatedFileInfo(path, false); + .getLocatedFileInfo(filePath.toString(), false); } catch (FileNotFoundException e) { LOG.debug("Path:{} doesn't exists!", path, e); } @@ -175,7 +192,7 @@ public class ExternalSPSContext implements Context<String> { } @Override - public String getNextSPSPath() { + public Long getNextSPSPath() { try { return nnc.getNNProtocolConnection().getNextSPSPath(); } catch (IOException e) { @@ -185,7 +202,7 @@ public class ExternalSPSContext implements Context<String> { } @Override - public void removeSPSPathId(String pathId) { + public void removeSPSPathId(long pathId) { // We need not specifically implement for external. } @@ -193,4 +210,40 @@ public class ExternalSPSContext implements Context<String> { public void removeAllSPSPathIds() { // We need not specifically implement for external. } -} + + @Override + public void scanAndCollectFiles(long path) + throws IOException, InterruptedException { + fileCollector.scanAndCollectFiles(path); + } + + @Override + public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException { + externalHandler.submitMoveTask(blkMovingInfo); + } + + @Override + public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) { + // External listener if it is plugged-in + if (blkMovementListener != null) { + blkMovementListener.notifyMovementTriedBlocks(moveAttemptFinishedBlks); + } + } + + /** + * Its an implementation of BlockMovementListener. + */ + private static class ExternalBlockMovementListener + implements BlockMovementListener { + + private List<Block> actualBlockMovements = new ArrayList<>(); + + @Override + public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) { + for (Block block : moveAttemptFinishedBlks) { + actualBlockMovements.add(block); + } + LOG.info("Movement attempted blocks", actualBlockMovements); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java index 9435475..611ff65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -41,14 +42,14 @@ import org.slf4j.LoggerFactory; * representation. */ @InterfaceAudience.Private -public class ExternalSPSFilePathCollector implements FileCollector <String>{ +public class ExternalSPSFilePathCollector implements FileCollector { public static final Logger LOG = LoggerFactory.getLogger(ExternalSPSFilePathCollector.class); private DistributedFileSystem dfs; - private SPSService<String> service; + private SPSService service; private int maxQueueLimitToScan; - public ExternalSPSFilePathCollector(SPSService<String> service) { + public ExternalSPSFilePathCollector(SPSService service) { this.service = service; this.maxQueueLimitToScan = service.getConf().getInt( DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, @@ -72,13 +73,13 @@ public class ExternalSPSFilePathCollector implements FileCollector <String>{ * Recursively scan the given path and add the file info to SPS service for * processing. */ - private long processPath(String startID, String childPath) { + private long processPath(Long startID, String childPath) { long pendingWorkCount = 0; // to be satisfied file counter for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) { final DirectoryListing children; try { - children = dfs.getClient().listPaths(childPath, lastReturnedName, - false); + children = dfs.getClient().listPaths(childPath, + lastReturnedName, false); } catch (IOException e) { LOG.warn("Failed to list directory " + childPath + ". Ignore the directory and continue.", e); @@ -93,18 +94,18 @@ public class ExternalSPSFilePathCollector implements FileCollector <String>{ } for (HdfsFileStatus child : children.getPartialListing()) { - String childFullPath = child.getFullName(childPath); if (child.isFile()) { - service.addFileToProcess( - new ItemInfo<String>(startID, childFullPath), false); + service.addFileToProcess(new ItemInfo(startID, child.getFileId()), + false); checkProcessingQueuesFree(); pendingWorkCount++; // increment to be satisfied file count } else { + String childFullPathName = child.getFullName(childPath); if (child.isDirectory()) { - if (!childFullPath.endsWith(Path.SEPARATOR)) { - childFullPath = childFullPath + Path.SEPARATOR; + if (!childFullPathName.endsWith(Path.SEPARATOR)) { + childFullPathName = childFullPathName + Path.SEPARATOR; } - pendingWorkCount += processPath(startID, childFullPath); + pendingWorkCount += processPath(startID, childFullPathName); } } } @@ -150,11 +151,12 @@ public class ExternalSPSFilePathCollector implements FileCollector <String>{ } @Override - public void scanAndCollectFiles(String path) throws IOException { + public void scanAndCollectFiles(long pathId) throws IOException { if (dfs == null) { dfs = getFS(service.getConf()); } - long pendingSatisfyItemsCount = processPath(path, path); + Path filePath = DFSUtilClient.makePathFromFileId(pathId); + long pendingSatisfyItemsCount = processPath(pathId, filePath.toString()); // Check whether the given path contains any item to be tracked // or the no to be satisfied paths. In case of empty list, add the given // inodeId to the 'pendingWorkForDirectory' with empty list so that later @@ -162,10 +164,10 @@ public class ExternalSPSFilePathCollector implements FileCollector <String>{ // this path is already satisfied the storage policy. if (pendingSatisfyItemsCount <= 0) { LOG.debug("There is no pending items to satisfy the given path " - + "inodeId:{}", path); - service.addAllFilesToProcess(path, new ArrayList<>(), true); + + "inodeId:{}", pathId); + service.addAllFilesToProcess(pathId, new ArrayList<>(), true); } else { - service.markScanCompletedForPath(path); + service.markScanCompletedForPath(pathId); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java index 236b887..af90f0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java @@ -22,7 +22,6 @@ import static org.apache.hadoop.util.ExitUtil.terminate; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; -import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -32,11 +31,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; @@ -68,8 +65,7 @@ public final class ExternalStoragePolicySatisfier { HdfsConfiguration spsConf = new HdfsConfiguration(); // login with SPS keytab secureLogin(spsConf); - StoragePolicySatisfier<String> sps = new StoragePolicySatisfier<String>( - spsConf); + StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf); nnc = getNameNodeConnector(spsConf); boolean spsRunning; @@ -82,12 +78,7 @@ public final class ExternalStoragePolicySatisfier { } ExternalSPSContext context = new ExternalSPSContext(sps, nnc); - ExternalBlockMovementListener blkMoveListener = - new ExternalBlockMovementListener(); - ExternalSPSBlockMoveTaskHandler externalHandler = - new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps); - sps.init(context, new ExternalSPSFilePathCollector(sps), externalHandler, - blkMoveListener); + sps.init(context); sps.start(true, StoragePolicySatisfierMode.EXTERNAL); if (sps != null) { sps.join(); @@ -132,21 +123,4 @@ public final class ExternalStoragePolicySatisfier { } } } - - /** - * It is implementation of BlockMovementListener. - */ - private static class ExternalBlockMovementListener - implements BlockMovementListener { - - private List<Block> actualBlockMovements = new ArrayList<>(); - - @Override - public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) { - for (Block block : moveAttemptFinishedBlks) { - actualBlockMovements.add(block); - } - LOG.info("Movement attempted blocks:{}", actualBlockMovements); - } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto index 2acc5a8..89edfbf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto @@ -218,7 +218,7 @@ message GetNextSPSPathRequestProto { } message GetNextSPSPathResponseProto { - optional string spsPath = 1; + optional uint64 spsPath = 1; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java index ed1fe92..f85769f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java @@ -45,22 +45,22 @@ import org.mockito.Mockito; */ public class TestBlockStorageMovementAttemptedItems { - private BlockStorageMovementAttemptedItems<Long> bsmAttemptedItems; - private BlockStorageMovementNeeded<Long> unsatisfiedStorageMovementFiles; + private BlockStorageMovementAttemptedItems bsmAttemptedItems; + private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles; private final int selfRetryTimeout = 500; @Before public void setup() throws Exception { Configuration config = new HdfsConfiguration(); - Context<Long> ctxt = Mockito.mock(IntraSPSNameNodeContext.class); - SPSService<Long> sps = new StoragePolicySatisfier<Long>(config); + Context ctxt = Mockito.mock(IntraSPSNameNodeContext.class); + SPSService sps = new StoragePolicySatisfier(config); Mockito.when(ctxt.isRunning()).thenReturn(true); Mockito.when(ctxt.isInSafeMode()).thenReturn(false); Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true); unsatisfiedStorageMovementFiles = - new BlockStorageMovementNeeded<Long>(ctxt, null); - bsmAttemptedItems = new BlockStorageMovementAttemptedItems<Long>(sps, - unsatisfiedStorageMovementFiles, null); + new BlockStorageMovementNeeded(ctxt); + bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps, + unsatisfiedStorageMovementFiles, ctxt); } @After @@ -76,7 +76,7 @@ public class TestBlockStorageMovementAttemptedItems { long stopTime = monotonicNow() + (retryTimeout * 2); boolean isItemFound = false; while (monotonicNow() < (stopTime)) { - ItemInfo<Long> ele = null; + ItemInfo ele = null; while ((ele = unsatisfiedStorageMovementFiles.get()) != null) { if (item == ele.getFile()) { isItemFound = true; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org