HDFS-13097: [SPS]: Fix the branch review comments(Part1). Contributed by Surendra Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/87e125a6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/87e125a6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/87e125a6 Branch: refs/heads/HDFS-10285 Commit: 87e125a6127b7e8ec599a1778b7bb195d6b186a9 Parents: 12e3125 Author: Uma Maheswara Rao G <uma.ganguma...@intel.com> Authored: Wed Feb 7 02:28:23 2018 -0800 Committer: Rakesh Radhakrishnan <rake...@apache.org> Committed: Tue Jul 31 12:10:43 2018 +0530 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hdfs/DFSClient.java | 4 +- .../hadoop/hdfs/protocol/ClientProtocol.java | 6 +- .../ClientNamenodeProtocolTranslatorPB.java | 14 +- .../src/main/proto/ClientNamenodeProtocol.proto | 8 +- .../federation/router/RouterRpcServer.java | 2 +- .../java/org/apache/hadoop/hdfs/DFSUtil.java | 61 --- ...tNamenodeProtocolServerSideTranslatorPB.java | 16 +- .../server/blockmanagement/BlockManager.java | 255 +----------- .../blockmanagement/DatanodeDescriptor.java | 33 +- .../hdfs/server/common/HdfsServerConstants.java | 2 +- .../datanode/StoragePolicySatisfyWorker.java | 15 +- .../apache/hadoop/hdfs/server/mover/Mover.java | 2 +- .../namenode/FSDirSatisfyStoragePolicyOp.java | 26 +- .../server/namenode/FSDirStatAndListingOp.java | 1 - .../hdfs/server/namenode/FSDirXAttrOp.java | 2 +- .../hdfs/server/namenode/FSDirectory.java | 2 +- .../hdfs/server/namenode/FSNamesystem.java | 46 +-- .../hadoop/hdfs/server/namenode/NameNode.java | 30 +- .../hdfs/server/namenode/NameNodeRpcServer.java | 21 +- .../sps/BlockStorageMovementNeeded.java | 4 +- .../namenode/sps/IntraSPSNameNodeContext.java | 6 +- .../hdfs/server/namenode/sps/SPSPathIds.java | 70 ---- .../hdfs/server/namenode/sps/SPSService.java | 10 +- .../namenode/sps/StoragePolicySatisfier.java | 137 ++++--- .../sps/StoragePolicySatisfyManager.java | 399 +++++++++++++++++++ .../sps/ExternalStoragePolicySatisfier.java | 2 +- .../hadoop/hdfs/tools/StoragePolicyAdmin.java | 2 +- .../namenode/TestNameNodeReconfigure.java | 19 +- .../TestPersistentStoragePolicySatisfier.java | 3 +- .../TestStoragePolicySatisfierWithHA.java | 6 +- .../sps/TestStoragePolicySatisfier.java | 35 +- ...stStoragePolicySatisfierWithStripedFile.java | 6 +- .../sps/TestExternalStoragePolicySatisfier.java | 24 +- 33 files changed, 665 insertions(+), 604 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 471ab2c..b6f9bdd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -3110,8 +3110,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } } - public boolean isStoragePolicySatisfierRunning() throws IOException { - return namenode.isStoragePolicySatisfierRunning(); + public boolean isInternalSatisfierRunning() throws IOException { + return namenode.isInternalSatisfierRunning(); } Tracer getTracer() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 360fd63..5c51c22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -1759,12 +1759,12 @@ public interface ClientProtocol { void satisfyStoragePolicy(String path) throws IOException; /** - * Check if StoragePolicySatisfier is running. - * @return true if StoragePolicySatisfier is running + * Check if internal StoragePolicySatisfier is running. + * @return true if internal StoragePolicySatisfier is running * @throws IOException */ @Idempotent - boolean isStoragePolicySatisfierRunning() throws IOException; + boolean isInternalSatisfierRunning() throws IOException; /** * Check the storage policy satisfy status of the path for which http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index cdc8eac..683ccca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -150,8 +150,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSto import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto; @@ -301,8 +301,8 @@ public class ClientNamenodeProtocolTranslatorPB implements private final static GetErasureCodingCodecsRequestProto VOID_GET_EC_CODEC_REQUEST = GetErasureCodingCodecsRequestProto .newBuilder().build(); - private final static IsStoragePolicySatisfierRunningRequestProto - VOID_IS_SPS_RUNNING_REQUEST = IsStoragePolicySatisfierRunningRequestProto + private final static IsInternalSatisfierRunningRequestProto + VOID_IS_SPS_RUNNING_REQUEST = IsInternalSatisfierRunningRequestProto .newBuilder().build(); @@ -1912,10 +1912,10 @@ public class ClientNamenodeProtocolTranslatorPB implements } @Override - public boolean isStoragePolicySatisfierRunning() throws IOException { + public boolean isInternalSatisfierRunning() throws IOException { try { - IsStoragePolicySatisfierRunningResponseProto rep = - rpcProxy.isStoragePolicySatisfierRunning(null, + IsInternalSatisfierRunningResponseProto rep = + rpcProxy.isInternalSatisfierRunning(null, VOID_IS_SPS_RUNNING_REQUEST); return rep.getRunning(); } catch (ServiceException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 154891d..6002883 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -835,10 +835,10 @@ message SatisfyStoragePolicyResponseProto { } -message IsStoragePolicySatisfierRunningRequestProto { // no parameters +message IsInternalSatisfierRunningRequestProto { // no parameters } -message IsStoragePolicySatisfierRunningResponseProto { +message IsInternalSatisfierRunningResponseProto { required bool running = 1; } @@ -1045,8 +1045,8 @@ service ClientNamenodeProtocol { returns(ListOpenFilesResponseProto); rpc satisfyStoragePolicy(SatisfyStoragePolicyRequestProto) returns(SatisfyStoragePolicyResponseProto); - rpc isStoragePolicySatisfierRunning(IsStoragePolicySatisfierRunningRequestProto) - returns(IsStoragePolicySatisfierRunningResponseProto); + rpc isInternalSatisfierRunning(IsInternalSatisfierRunningRequestProto) + returns(IsInternalSatisfierRunningResponseProto); rpc checkStoragePolicySatisfyPathStatus(CheckStoragePolicySatisfyPathStatusRequestProto) returns(CheckStoragePolicySatisfyPathStatusResponseProto); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index c5458f0..d93f99d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -2498,7 +2498,7 @@ public class RouterRpcServer extends AbstractService } @Override - public boolean isStoragePolicySatisfierRunning() throws IOException { + public boolean isInternalSatisfierRunning() throws IOException { checkOperation(OperationCategory.READ, false); return false; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index c26599c..23f0478 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -53,7 +53,6 @@ import java.util.Collection; import java.util.Comparator; import java.util.Date; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -74,7 +73,6 @@ import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -1459,26 +1457,6 @@ public class DFSUtil { } /** - * Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from - * configuration. - * - * @param conf Configuration - * @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION - */ - public static int getSPSWorkMultiplier(Configuration conf) { - int spsWorkMultiplier = conf - .getInt( - DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION, - DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT); - Preconditions.checkArgument( - (spsWorkMultiplier > 0), - DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION + - " = '" + spsWorkMultiplier + "' is invalid. " + - "It should be a positive, non-zero integer value."); - return spsWorkMultiplier; - } - - /** * Get SPNEGO keytab Key from configuration * * @param conf Configuration @@ -1738,43 +1716,4 @@ public class DFSUtil { } return id; } - - /** - * Remove the overlap between the expected types and the existing types. - * - * @param expected - * - Expected storage types list. - * @param existing - * - Existing storage types list. - * @param ignoreNonMovable - * ignore non-movable storage types by removing them from both - * expected and existing storage type list to prevent non-movable - * storage from being moved. - * @returns if the existing types or the expected types is empty after - * removing the overlap. - */ - public static boolean removeOverlapBetweenStorageTypes( - List<StorageType> expected, - List<StorageType> existing, boolean ignoreNonMovable) { - for (Iterator<StorageType> i = existing.iterator(); i.hasNext();) { - final StorageType t = i.next(); - if (expected.remove(t)) { - i.remove(); - } - } - if (ignoreNonMovable) { - removeNonMovable(existing); - removeNonMovable(expected); - } - return expected.isEmpty() || existing.isEmpty(); - } - - private static void removeNonMovable(List<StorageType> types) { - for (Iterator<StorageType> i = types.iterator(); i.hasNext();) { - final StorageType t = i.next(); - if (!t.isMovable()) { - i.remove(); - } - } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 09f7ce2..b0816cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -162,8 +162,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFile import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto; @@ -1865,14 +1865,14 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements } @Override - public IsStoragePolicySatisfierRunningResponseProto - isStoragePolicySatisfierRunning(RpcController controller, - IsStoragePolicySatisfierRunningRequestProto req) + public IsInternalSatisfierRunningResponseProto + isInternalSatisfierRunning(RpcController controller, + IsInternalSatisfierRunningRequestProto req) throws ServiceException { try { - boolean ret = server.isStoragePolicySatisfierRunning(); - IsStoragePolicySatisfierRunningResponseProto.Builder builder = - IsStoragePolicySatisfierRunningResponseProto.newBuilder(); + boolean ret = server.isInternalSatisfierRunning(); + IsInternalSatisfierRunningResponseProto.Builder builder = + IsInternalSatisfierRunningResponseProto.newBuilder(); builder.setRunning(ret); return builder.build(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 00a91a9..7e0c943 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -69,8 +69,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; @@ -94,12 +92,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; -import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler; -import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext; -import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeFileIdCollector; -import org.apache.hadoop.hdfs.server.namenode.sps.SPSPathIds; -import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; -import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; @@ -435,11 +428,7 @@ public class BlockManager implements BlockStatsMXBean { private final BlockIdManager blockIdManager; /** For satisfying block storage policies. */ - private final StoragePolicySatisfier sps; - private final boolean storagePolicyEnabled; - private StoragePolicySatisfierMode spsMode; - private SPSPathIds spsPaths; - private final int spsOutstandingPathsLimit; + private final StoragePolicySatisfyManager spsManager; /** Minimum live replicas needed for the datanode to be transitioned * from ENTERING_MAINTENANCE to IN_MAINTENANCE. @@ -479,19 +468,10 @@ public class BlockManager implements BlockStatsMXBean { DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); - // StoragePolicySatisfier(SPS) configs - storagePolicyEnabled = - conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, - DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT); - String spsModeVal = conf.get( - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT); - spsOutstandingPathsLimit = conf.getInt( - DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY, - DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT); - spsMode = StoragePolicySatisfierMode.fromString(spsModeVal); - spsPaths = new SPSPathIds(); - sps = new StoragePolicySatisfier(conf); + + // sps manager manages the user invoked sps paths and does the movement. + spsManager = new StoragePolicySatisfyManager(conf, namesystem, this); + blockTokenSecretManager = createBlockTokenSecretManager(conf); providedStorageMap = new ProvidedStorageMap(namesystem, this, conf); @@ -719,7 +699,7 @@ public class BlockManager implements BlockStatsMXBean { } public void close() { - stopSPS(false); + getSPSManager().stop(); bmSafeMode.close(); try { redundancyThread.interrupt(); @@ -733,7 +713,7 @@ public class BlockManager implements BlockStatsMXBean { datanodeManager.close(); pendingReconstruction.stop(); blocksMap.close(); - stopSPSGracefully(); + getSPSManager().stopGracefully(); } /** @return the datanodeManager */ @@ -5038,222 +5018,9 @@ public class BlockManager implements BlockStatsMXBean { } /** - * Gets the storage policy satisfier instance. - * - * @return sps - */ - public StoragePolicySatisfier getStoragePolicySatisfier() { - return sps; - } - - /** - * Start storage policy satisfier service. - */ - public void startSPS() { - if (!(storagePolicyEnabled && spsMode != StoragePolicySatisfierMode.NONE)) { - LOG.info( - "Failed to start StoragePolicySatisfier " - + " as {} set to {} and {} set to {}.", - DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled, - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, spsMode); - return; - } else if (sps.isRunning()) { - LOG.info("Storage policy satisfier is already running" - + " as internal service."); - return; - } - // starting internal SPS service - if (spsMode == StoragePolicySatisfierMode.INTERNAL) { - sps.start(false, spsMode); - } - } - - /** - * Stop storage policy satisfier service. - * - * @param forceStop - * true represents that it should stop SPS service by clearing all - * pending SPS work - */ - public void stopSPS(boolean forceStop) { - if (!(storagePolicyEnabled - && (spsMode != StoragePolicySatisfierMode.NONE))) { - LOG.info("Storage policy satisfier is not enabled."); - return; - } else if (!sps.isRunning()) { - removeAllSPSPathIds(); - LOG.info("Storage policy satisfier is not running."); - return; - } - - sps.disable(forceStop); - } - - /** - * Enable storage policy satisfier by starting its service. - */ - public void enableInternalSPS() { - if (!storagePolicyEnabled){ - LOG.info("Failed to start StoragePolicySatisfier as {} set to {}.", - DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled); - return; - } - if (sps.isRunning()) { - LOG.info("Storage policy satisfier is already running as SPS mode:{}.", - spsMode); - return; - } - updateSPSMode(StoragePolicySatisfierMode.INTERNAL); - sps.init(new IntraSPSNameNodeContext(this.namesystem, this, sps), - new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(), - sps), - new IntraSPSNameNodeBlockMoveTaskHandler(this, this.namesystem), null); - sps.start(true, spsMode); - } - - - - /** - * Enable storage policy satisfier by starting its service. - */ - public void enableExternalSPS() { - if (!storagePolicyEnabled){ - LOG.info("Failed to start StoragePolicySatisfier as {} set to {}.", - DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled); - return; - } - if (spsMode == StoragePolicySatisfierMode.EXTERNAL) { - LOG.info("Storage policy satisfier is already enabled as SPS mode:{}.", - spsMode); - return; - } - updateSPSMode(StoragePolicySatisfierMode.EXTERNAL); - sps.stopGracefully(); - } - - private void updateSPSMode(StoragePolicySatisfierMode newSpsMode) { - LOG.debug("Updating SPS service status, current mode:{}, new mode:{}", - spsMode, newSpsMode); - spsMode = newSpsMode; - } - - /** - * Disable the storage policy satisfier by stopping its services. - */ - public void disableSPS() { - switch (spsMode) { - case NONE: - break; - case INTERNAL: - case EXTERNAL: - if (!sps.isRunning()) { - LOG.info("Storage policy satisfier is already stopped."); - } else { - LOG.info("Stopping StoragePolicySatisfier mode {}, as admin " - + "requested to stop it.", spsMode); - sps.disable(true); - } - removeAllSPSPathIds(); - break; - default: - // nothing - break; - } - updateSPSMode(StoragePolicySatisfierMode.NONE); - } - - /** - * Timed wait to stop storage policy satisfier daemon threads. - */ - public void stopSPSGracefully() { - removeAllSPSPathIds(); - sps.stopGracefully(); - } - /** - * @return True if storage policy satisfier running. - */ - public boolean isStoragePolicySatisfierRunning() { - return sps.isRunning(); - } - - /** - * @return status - * Storage policy satisfy status of the path. - * @throws IOException - */ - public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus( - String path) throws IOException { - if (spsMode != StoragePolicySatisfierMode.INTERNAL) { - LOG.debug("Satisfier is not running inside namenode, so status " - + "can't be returned."); - throw new IOException("Satisfier is not running inside namenode, " - + "so status can't be returned."); - } - return sps.checkStoragePolicySatisfyPathStatus(path); - } - - /** - * @return SPS service instance. - */ - public SPSService getSPSService() { - return this.sps; - } - - /** - * @return the next SPS path id, on which path users has invoked to satisfy - * storages. - */ - public Long getNextSPSPathId() { - return spsPaths.pollNext(); - } - - /** - * Verify that satisfier queue limit exceeds allowed outstanding limit. - */ - public void verifyOutstandingSPSPathQLimit() throws IOException { - long size = spsPaths.size(); - // Checking that the SPS call Q exceeds the allowed limit. - if (spsOutstandingPathsLimit - size <= 0) { - LOG.debug("Satisifer Q - outstanding limit:{}, current size:{}", - spsOutstandingPathsLimit, size); - throw new IOException("Outstanding satisfier queue limit: " - + spsOutstandingPathsLimit + " exceeded, try later!"); - } - } - - /** - * Removes the SPS path id from the list of sps paths. - */ - public void removeSPSPathId(long trackId) { - spsPaths.remove(trackId); - } - - /** - * Clean up all sps path ids. - */ - public void removeAllSPSPathIds() { - spsPaths.clear(); - } - - /** - * Adds the sps path to SPSPathIds list. - */ - public void addSPSPathId(long id) { - spsPaths.add(id); - } - - /** - * @return true if sps is running as an internal service or external service. - */ - public boolean isSPSEnabled() { - return spsMode == StoragePolicySatisfierMode.INTERNAL - || spsMode == StoragePolicySatisfierMode.EXTERNAL; - } - - /** - * @return sps service mode. + * @return sps manager. */ - public StoragePolicySatisfierMode getSPSMode() { - return spsMode; + public StoragePolicySatisfyManager getSPSManager() { + return spsManager; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index b09d908..24b948c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -211,8 +211,8 @@ public class DatanodeDescriptor extends DatanodeInfo { * A queue of blocks corresponding to trackID for moving its storage * placements by this datanode. */ - private final Queue<BlockMovingInfo> storageMovementBlocks = - new LinkedList<>(); + private final BlockQueue<BlockMovingInfo> storageMovementBlocks = + new BlockQueue<>(); private volatile boolean dropSPSWork = false; /* Variables for maintaining number of blocks scheduled to be written to @@ -369,6 +369,7 @@ public class DatanodeDescriptor extends DatanodeInfo { this.pendingCached.clear(); this.cached.clear(); this.pendingUncached.clear(); + this.storageMovementBlocks.clear(); } public int numBlocks() { @@ -1082,9 +1083,10 @@ public class DatanodeDescriptor extends DatanodeInfo { * - storage mismatched block info */ public void addBlocksToMoveStorage(BlockMovingInfo blkMovingInfo) { - synchronized (storageMovementBlocks) { - storageMovementBlocks.offer(blkMovingInfo); - } + storageMovementBlocks.offer(blkMovingInfo); + BlockManager.LOG + .debug("Adding block move task " + blkMovingInfo + " to " + getName() + + ", current queue size is " + storageMovementBlocks.size()); } /** @@ -1101,23 +1103,18 @@ public class DatanodeDescriptor extends DatanodeInfo { * total number of blocks which will be send to this datanode for * block movement. * - * @return block infos which needs to move its storage locations. + * @return block infos which needs to move its storage locations or null if + * there is no block infos to move. */ public BlockMovingInfo[] getBlocksToMoveStorages(int numBlocksToMoveTasks) { - synchronized (storageMovementBlocks) { - List<BlockMovingInfo> blockMovingInfos = new ArrayList<>(); - for (; !storageMovementBlocks.isEmpty() - && numBlocksToMoveTasks > 0; numBlocksToMoveTasks--) { - blockMovingInfos.add(storageMovementBlocks.poll()); - } - BlockMovingInfo[] blkMoveArray = new BlockMovingInfo[blockMovingInfos - .size()]; - blkMoveArray = blockMovingInfos.toArray(blkMoveArray); - if (blkMoveArray.length > 0) { - return blkMoveArray; - } + List<BlockMovingInfo> blockMovingInfos = storageMovementBlocks + .poll(numBlocksToMoveTasks); + if (blockMovingInfos == null || blockMovingInfos.size() <= 0) { return null; } + BlockMovingInfo[] blkMoveArray = new BlockMovingInfo[blockMovingInfos + .size()]; + return blockMovingInfos.toArray(blkMoveArray); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java index 1378de2..c6e2263 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java @@ -365,7 +365,7 @@ public interface HdfsServerConstants { String XATTR_ERASURECODING_POLICY = "system.hdfs.erasurecoding.policy"; - String XATTR_SATISFY_STORAGE_POLICY = "user.hdfs.sps.xattr"; + String XATTR_SATISFY_STORAGE_POLICY = "user.hdfs.sps"; Path MOVER_ID_PATH = new Path("/system/mover.id"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java index 42f2e93..af6137c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java @@ -75,9 +75,8 @@ public class StoragePolicySatisfyWorker { public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) { this.datanode = datanode; - - moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, - DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); + // Defaulting to 10. This is to minimise the number of move ops. + moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 10); moveExecutor = initializeBlockMoverThreadPool(moverThreads); moverCompletionService = new ExecutorCompletionService<>(moveExecutor); handler = new BlocksMovementsStatusHandler(); @@ -127,21 +126,13 @@ public class StoragePolicySatisfyWorker { TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new Daemon.DaemonFactory() { private final AtomicInteger threadIndex = new AtomicInteger(0); + @Override public Thread newThread(Runnable r) { Thread t = super.newThread(r); t.setName("BlockMoverTask-" + threadIndex.getAndIncrement()); return t; } - }, new ThreadPoolExecutor.CallerRunsPolicy() { - @Override - public void rejectedExecution(Runnable runnable, - ThreadPoolExecutor e) { - LOG.info("Execution for block movement to satisfy storage policy" - + " got rejected, Executing in current thread"); - // will run in the current thread. - super.rejectedExecution(runnable, e); - } }); moverThreadPool.allowCoreThreadTimeOut(true); http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 2cc0e27..af5ab2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -661,7 +661,7 @@ public class Mover { boolean spsRunning; try { spsRunning = nnc.getDistributedFileSystem().getClient() - .isStoragePolicySatisfierRunning(); + .isInternalSatisfierRunning(); } catch (RemoteException e) { IOException cause = e.unwrapRemoteException(); if (cause instanceof StandbyException) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java index 5ffd6e8..45d6218 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import java.io.IOException; +import java.util.Arrays; import java.util.EnumSet; import java.util.List; @@ -75,24 +76,33 @@ final class FSDirSatisfyStoragePolicyOp { fsd.checkPathAccess(pc, iip, FsAction.WRITE); } INode inode = FSDirectory.resolveLastINode(iip); - if (inodeHasSatisfyXAttr(inode)) { - throw new IOException( - "Cannot request to call satisfy storage policy on path " + if (inode.isFile() && inode.asFile().numBlocks() == 0) { + if (NameNode.LOG.isInfoEnabled()) { + NameNode.LOG.info( + "Skipping satisfy storage policy on path:{} as " + + "this file doesn't have any blocks!", + inode.getFullPathName()); + } + } else if (inodeHasSatisfyXAttr(inode)) { + NameNode.LOG + .warn("Cannot request to call satisfy storage policy on path: " + inode.getFullPathName() + ", as this file/dir was already called for satisfying " + "storage policy."); - } - if (unprotectedSatisfyStoragePolicy(inode, fsd)) { + } else { XAttr satisfyXAttr = XAttrHelper .buildXAttr(XATTR_SATISFY_STORAGE_POLICY); - List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1); - xAttrs.add(satisfyXAttr); + List<XAttr> xAttrs = Arrays.asList(satisfyXAttr); List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode); List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsd, existingXAttrs, xAttrs, EnumSet.of(XAttrSetFlag.CREATE)); XAttrStorage.updateINodeXAttrs(inode, newXAttrs, iip.getLatestSnapshotId()); fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache); + + // Adding directory in the pending queue, so FileInodeIdCollector + // process directory child in batch and recursively + fsd.getBlockManager().getSPSManager().addPathId(inode.getId()); } } finally { fsd.writeUnlock(); @@ -106,7 +116,7 @@ final class FSDirSatisfyStoragePolicyOp { } else { // Adding directory in the pending queue, so FileInodeIdCollector process // directory child in batch and recursively - fsd.getBlockManager().addSPSPathId(inode.getId()); + fsd.getBlockManager().getSPSManager().addPathId(inode.getId()); return true; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java index 709e270..7e22ae1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java @@ -90,7 +90,6 @@ class FSDirStatAndListingOp { * @param srcArg The string representation of the path to the file * @param resolveLink whether to throw UnresolvedLinkException * if src refers to a symlink - * @param needLocation if blockLocations need to be returned * * @param needLocation Include {@link LocatedBlocks} in result. * @param needBlockToken Include block tokens in {@link LocatedBlocks}. http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java index 459e697..1150a72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java @@ -209,7 +209,7 @@ class FSDirXAttrOp { for (XAttr xattr : toRemove) { if (XATTR_SATISFY_STORAGE_POLICY .equals(XAttrHelper.getPrefixedName(xattr))) { - fsd.getBlockManager().getStoragePolicySatisfier() + fsd.getBlockManager().getSPSManager().getInternalSPSService() .clearQueue(inode.getId()); break; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 2c9d627..6539b51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -1401,7 +1401,7 @@ public class FSDirectory implements Closeable { if (!inode.isSymlink()) { final XAttrFeature xaf = inode.getXAttrFeature(); addEncryptionZone((INodeWithAdditionalFields) inode, xaf); - if (namesystem.getBlockManager().isSPSEnabled()) { + if (namesystem.getBlockManager().getSPSManager().isEnabled()) { addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 15d0cb4..35ef5d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -259,10 +259,7 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager; -import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler; -import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext; -import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeFileIdCollector; -import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; +import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; @@ -1295,13 +1292,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir, edekCacheLoaderDelay, edekCacheLoaderInterval); } - blockManager.getSPSService().init( - new IntraSPSNameNodeContext(this, blockManager, - blockManager.getSPSService()), - new IntraSPSNameNodeFileIdCollector(getFSDirectory(), - blockManager.getSPSService()), - new IntraSPSNameNodeBlockMoveTaskHandler(blockManager, this), null); - blockManager.startSPS(); + blockManager.getSPSManager().start(); } finally { startingActiveService = false; blockManager.checkSafeMode(); @@ -1332,7 +1323,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, writeLock(); try { if (blockManager != null) { - blockManager.stopSPS(false); + blockManager.getSPSManager().stop(); } stopSecretManager(); leaseManager.stopMonitor(); @@ -1372,7 +1363,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // Don't want to keep replication queues when not in Active. blockManager.clearQueues(); blockManager.setInitializedReplQueues(false); - blockManager.stopSPSGracefully(); + blockManager.getSPSManager().stopGracefully(); } } finally { writeUnlock("stopActiveServices"); @@ -2281,17 +2272,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, DFS_STORAGE_POLICY_ENABLED_KEY)); } // checks sps status - if (!blockManager.isSPSEnabled() - || (blockManager.getSPSMode() == StoragePolicySatisfierMode.INTERNAL - && !blockManager.getStoragePolicySatisfier().isRunning())) { + if (!blockManager.getSPSManager().isEnabled() || (blockManager + .getSPSManager().getMode() == StoragePolicySatisfierMode.INTERNAL + && !blockManager.getSPSManager().isInternalSatisfierRunning())) { throw new UnsupportedActionException( "Cannot request to satisfy storage policy " + "when storage policy satisfier feature has been disabled" + " by admin. Seek for an admin help to enable it " + "or use Mover tool."); } - // checks SPS Q has many outstanding requests. - blockManager.verifyOutstandingSPSPathQLimit(); + // checks SPS Q has many outstanding requests. It will throw IOException if + // the limit exceeds. + blockManager.getSPSManager().verifyOutstandingPathQLimit(); } /** @@ -3996,17 +3988,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } // Handle blocks movement results sent by the coordinator datanode. - StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); - if (sps != null) { - if (!sps.isRunning()) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Storage policy satisfier is not running. So, ignoring storage" - + " movement attempt finished block info sent by DN"); - } - } else { - sps.notifyStorageMovementAttemptFinishedBlks(blksMovementsFinished); + SPSService sps = blockManager.getSPSManager().getInternalSPSService(); + if (!sps.isRunning()) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Storage policy satisfier is not running. So, ignoring storage" + + " movement attempt finished block info sent by DN"); } + } else { + sps.notifyStorageMovementAttemptFinishedBlks(blksMovementsFinished); } //create ha status http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index bc8f54f..b199c72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -2044,7 +2044,7 @@ public class NameNode extends ReconfigurableBase implements } else if (property.equals(ipcClientRPCBackoffEnable)) { return reconfigureIPCBackoffEnabled(newVal); } else if (property.equals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY)) { - return reconfigureSPSEnabled(newVal, property); + return reconfigureSPSModeEvent(newVal, property); } else { throw new ReconfigurationException(property, newVal, getConf().get( property)); @@ -2128,39 +2128,27 @@ public class NameNode extends ReconfigurableBase implements return Boolean.toString(clientBackoffEnabled); } - String reconfigureSPSEnabled(String newVal, String property) + String reconfigureSPSModeEvent(String newVal, String property) throws ReconfigurationException { if (newVal == null || StoragePolicySatisfierMode.fromString(newVal) == null) { throw new ReconfigurationException(property, newVal, getConf().get(property), new HadoopIllegalArgumentException( - "For enabling or disabling storage policy satisfier, we must " - + "pass either none/internal/external string value only")); + "For enabling or disabling storage policy satisfier, must " + + "pass either internal/external/none string value only")); } if (!isActiveState()) { throw new ReconfigurationException(property, newVal, - getConf().get(property), new HadoopIllegalArgumentException( - "Enabling or disabling storage policy satisfier service on " - + state + " NameNode is not allowed")); + getConf().get(property), + new HadoopIllegalArgumentException( + "Enabling or disabling storage policy satisfier service on " + + state + " NameNode is not allowed")); } StoragePolicySatisfierMode mode = StoragePolicySatisfierMode .fromString(newVal); - switch(mode){ - case NONE: - namesystem.getBlockManager().disableSPS(); - break; - case INTERNAL: - namesystem.getBlockManager().enableInternalSPS(); - break; - case EXTERNAL: - namesystem.getBlockManager().enableExternalSPS(); - break; - default: - // nothing - break; - } + namesystem.getBlockManager().getSPSManager().changeModeEvent(mode); return newVal; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index d74dc9e..97f38c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -2536,15 +2536,15 @@ public class NameNodeRpcServer implements NamenodeProtocols { } @Override - public boolean isStoragePolicySatisfierRunning() throws IOException { + public boolean isInternalSatisfierRunning() throws IOException { checkNNStartup(); - String operationName = "isStoragePolicySatisfierRunning"; + String operationName = "isInternalSatisfierRunning"; namesystem.checkSuperuserPrivilege(operationName); if (nn.isStandbyState()) { throw new StandbyException("Not supported by Standby Namenode."); } - boolean isSPSRunning = - namesystem.getBlockManager().isStoragePolicySatisfierRunning(); + boolean isSPSRunning = namesystem.getBlockManager().getSPSManager() + .isInternalSatisfierRunning(); namesystem.logAuditEvent(true, operationName, null); return isSPSRunning; } @@ -2556,8 +2556,8 @@ public class NameNodeRpcServer implements NamenodeProtocols { if (nn.isStandbyState()) { throw new StandbyException("Not supported by Standby Namenode."); } - return namesystem.getBlockManager().checkStoragePolicySatisfyPathStatus( - path); + return namesystem.getBlockManager().getSPSManager() + .checkStoragePolicySatisfyPathStatus(path); } @Override @@ -2579,17 +2579,16 @@ public class NameNodeRpcServer implements NamenodeProtocols { if (nn.isStandbyState()) { throw new StandbyException("Not supported by Standby Namenode."); } - // Check that internal SPS service is running - if (namesystem.getBlockManager() - .getSPSMode() == StoragePolicySatisfierMode.INTERNAL - && namesystem.getBlockManager().getSPSService().isRunning()) { + // Check that SPS daemon service is running inside namenode + if (namesystem.getBlockManager().getSPSManager() + .getMode() == StoragePolicySatisfierMode.INTERNAL) { LOG.debug("SPS service is internally enabled and running inside " + "namenode, so external SPS is not allowed to fetch the path Ids"); throw new IOException("SPS service is internally enabled and running" + " inside namenode, so external SPS is not allowed to fetch" + " the path Ids"); } - return namesystem.getBlockManager().getNextSPSPathId(); + return namesystem.getBlockManager().getSPSManager().getNextPathId(); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java index 8a10183..c683a63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java @@ -311,7 +311,7 @@ public class BlockStorageMovementNeeded { if (Time.monotonicNow() - lastStatusCleanTime > statusClearanceElapsedTimeMs) { lastStatusCleanTime = Time.monotonicNow(); - cleanSpsStatus(); + cleanSPSStatus(); } startINodeId = null; // Current inode id successfully scanned. } @@ -333,7 +333,7 @@ public class BlockStorageMovementNeeded { } } - private synchronized void cleanSpsStatus() { + private synchronized void cleanSPSStatus() { for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it = spsStatus.entrySet().iterator(); it.hasNext();) { Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java index ff6cc21..495d1c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java @@ -178,17 +178,17 @@ public class IntraSPSNameNodeContext implements Context { @Override public Long getNextSPSPathId() { - return blockManager.getNextSPSPathId(); + return blockManager.getSPSManager().getNextPathId(); } @Override public void removeSPSPathId(long trackId) { - blockManager.removeSPSPathId(trackId); + blockManager.getSPSManager().removePathId(trackId); } @Override public void removeAllSPSPathIds() { - blockManager.removeAllSPSPathIds(); + blockManager.getSPSManager().removeAllPathIds(); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java deleted file mode 100644 index 6c0f8b2..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode.sps; - -import java.util.LinkedList; -import java.util.Queue; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * A class which holds the SPS invoked path ids. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class SPSPathIds { - - // List of pending dir to satisfy the policy - private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>(); - - /** - * Add the path id to queue. - */ - public synchronized void add(long pathId) { - spsDirsToBeTraveresed.add(pathId); - } - - /** - * Removes the path id. - */ - public synchronized void remove(long pathId) { - spsDirsToBeTraveresed.remove(pathId); - } - - /** - * Clears all path ids. - */ - public synchronized void clear() { - spsDirsToBeTraveresed.clear(); - } - - /** - * @return next path id available in queue. - */ - public synchronized Long pollNext() { - return spsDirsToBeTraveresed.poll(); - } - - /** - * @return the size of the queue. - */ - public synchronized long size() { - return spsDirsToBeTraveresed.size(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java index ceec3f3..da6e365 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java @@ -67,11 +67,12 @@ public interface SPSService { void stopGracefully(); /** - * Disable the SPS service. + * Stops the SPS service. * * @param forceStop + * true represents to clear all the sps path's hint, false otherwise. */ - void disable(boolean forceStop); + void stop(boolean forceStop); /** * Check whether StoragePolicySatisfier is running. @@ -106,6 +107,11 @@ public interface SPSService { int processingQueueSize(); /** + * Clear inodeId present in the processing queue. + */ + void clearQueue(long inodeId); + + /** * @return the configuration. */ Configuration getConf(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java index 87faced..6b449aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java @@ -32,7 +32,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -60,6 +59,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; /** * Setting storagePolicy on a file after the file write will only update the new @@ -145,7 +145,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { new BlockStorageMovementAttemptedItems(this, storageMovementNeeded, blockMovementListener); this.blockMoveTaskHandler = blockMovementTaskHandler; - this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(getConf()); + this.spsWorkMultiplier = getSPSWorkMultiplier(getConf()); this.blockMovementMaxRetry = getConf().getInt( DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY, DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT); @@ -163,8 +163,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable { serviceMode); return; } - isRunning = true; - this.spsMode = serviceMode; if (spsMode == StoragePolicySatisfierMode.INTERNAL && ctxt.isMoverRunning()) { isRunning = false; @@ -182,6 +180,8 @@ public class StoragePolicySatisfier implements SPSService, Runnable { StringUtils.toLowerCase(spsMode.toString())); } + isRunning = true; + this.spsMode = serviceMode; // Ensure that all the previously submitted block movements(if any) have to // be stopped in all datanodes. addDropSPSWorkCommandsToAllDNs(); @@ -193,7 +193,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } @Override - public synchronized void disable(boolean forceStop) { + public synchronized void stop(boolean forceStop) { isRunning = false; if (storagePolicySatisfierThread == null) { return; @@ -214,19 +214,22 @@ public class StoragePolicySatisfier implements SPSService, Runnable { @Override public synchronized void stopGracefully() { if (isRunning) { - disable(true); + stop(false); } if (this.storageMovementsMonitor != null) { this.storageMovementsMonitor.stopGracefully(); } - if (storagePolicySatisfierThread == null) { - return; - } - try { - storagePolicySatisfierThread.join(3000); - } catch (InterruptedException ie) { + if (storagePolicySatisfierThread != null) { + try { + storagePolicySatisfierThread.join(3000); + } catch (InterruptedException ie) { + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted Exception while waiting to join sps thread," + + " ignoring it", ie); + } + } } } @@ -351,32 +354,26 @@ public class StoragePolicySatisfier implements SPSService, Runnable { Thread.sleep(3000); blockCount = 0L; } + } catch (IOException e) { + LOG.error("Exception during StoragePolicySatisfier execution - " + + "will continue next cycle", e); } catch (Throwable t) { - handleException(t); - } - } - } - - private void handleException(Throwable t) { - // double check to avoid entering into synchronized block. - if (isRunning) { - synchronized (this) { - if (isRunning) { - if (t instanceof InterruptedException) { + synchronized (this) { + if (isRunning) { isRunning = false; - LOG.info("Stopping StoragePolicySatisfier."); + if (t instanceof InterruptedException) { + LOG.info("Stopping StoragePolicySatisfier.", t); + } else { + LOG.error("StoragePolicySatisfier thread received " + + "runtime exception.", t); + } // Stopping monitor thread and clearing queues as well this.clearQueues(); this.storageMovementsMonitor.stopGracefully(); - } else { - LOG.error( - "StoragePolicySatisfier thread received runtime exception, " - + "ignoring", t); } } } } - return; } private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN( @@ -434,7 +431,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { List<StorageType> existing = new LinkedList<StorageType>( Arrays.asList(blockInfo.getStorageTypes())); - if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes, + if (!removeOverlapBetweenStorageTypes(expectedStorageTypes, existing, true)) { boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos, blockInfo, expectedStorageTypes, existing, blockInfo.getLocations(), @@ -499,7 +496,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { DatanodeInfo[] storages, DatanodeStorageReport[] liveDns, ErasureCodingPolicy ecPolicy) { boolean foundMatchingTargetNodesForBlock = true; - if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes, + if (!removeOverlapBetweenStorageTypes(expectedStorageTypes, existing, true)) { List<StorageTypeNodePair> sourceWithStorageMap = new ArrayList<StorageTypeNodePair>(); @@ -881,21 +878,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } /** - * Set file inode in queue for which storage movement needed for its blocks. - * - * @param inodeId - * - file inode/blockcollection id. - */ - public void satisfyStoragePolicy(Long inodeId) { - //For file startId and trackId is same - storageMovementNeeded.add(new ItemInfo(inodeId, inodeId)); - if (LOG.isDebugEnabled()) { - LOG.debug("Added track info for inode {} to block " - + "storageMovementNeeded queue", inodeId); - } - } - - /** * Clear queues for given track id. */ public void clearQueue(long trackId) { @@ -958,6 +940,10 @@ public class StoragePolicySatisfier implements SPSService, Runnable { @Override public void addFileIdToProcess(ItemInfo trackInfo, boolean scanCompleted) { storageMovementNeeded.add(trackInfo, scanCompleted); + if (LOG.isDebugEnabled()) { + LOG.debug("Added track info for inode {} to block " + + "storageMovementNeeded queue", trackInfo.getFileId()); + } } @Override @@ -993,4 +979,63 @@ public class StoragePolicySatisfier implements SPSService, Runnable { //TODO Add join here on SPS rpc server also storagePolicySatisfierThread.join(); } + + /** + * Remove the overlap between the expected types and the existing types. + * + * @param expected + * - Expected storage types list. + * @param existing + * - Existing storage types list. + * @param ignoreNonMovable + * ignore non-movable storage types by removing them from both + * expected and existing storage type list to prevent non-movable + * storage from being moved. + * @returns if the existing types or the expected types is empty after + * removing the overlap. + */ + private static boolean removeOverlapBetweenStorageTypes( + List<StorageType> expected, + List<StorageType> existing, boolean ignoreNonMovable) { + for (Iterator<StorageType> i = existing.iterator(); i.hasNext();) { + final StorageType t = i.next(); + if (expected.remove(t)) { + i.remove(); + } + } + if (ignoreNonMovable) { + removeNonMovable(existing); + removeNonMovable(expected); + } + return expected.isEmpty() || existing.isEmpty(); + } + + private static void removeNonMovable(List<StorageType> types) { + for (Iterator<StorageType> i = types.iterator(); i.hasNext();) { + final StorageType t = i.next(); + if (!t.isMovable()) { + i.remove(); + } + } + } + + /** + * Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from + * configuration. + * + * @param conf Configuration + * @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION + */ + private static int getSPSWorkMultiplier(Configuration conf) { + int spsWorkMultiplier = conf + .getInt( + DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION, + DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT); + Preconditions.checkArgument( + (spsWorkMultiplier > 0), + DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION + + " = '" + spsWorkMultiplier + "' is invalid. " + + "It should be a positive, non-zero integer value."); + return spsWorkMultiplier; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java new file mode 100644 index 0000000..5bdf6ae --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.sps; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.sps.ExternalStoragePolicySatisfier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This manages satisfy storage policy invoked path ids and expose methods to + * process these path ids. It maintains sps mode(INTERNAL/EXTERNAL/NONE) + * configured by the administrator. + * + * <p> + * If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then + * it will start internal sps daemon service inside namenode and process sps + * invoked path ids to satisfy the storage policy. + * + * <p> + * If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then + * it won't do anything, just maintains the sps invoked path ids. Administrator + * requires to start external sps service explicitly, to fetch the sps invoked + * path ids from namenode, then do necessary computations and block movement in + * order to satisfy the storage policy. Please refer + * {@link ExternalStoragePolicySatisfier} class to understand more about the + * external sps service functionality. + * + * <p> + * If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then it + * will disable the sps feature completely by clearing all queued up sps path's + * hint. + * + * This class is instantiated by the BlockManager. + */ +public class StoragePolicySatisfyManager { + private static final Logger LOG = LoggerFactory + .getLogger(StoragePolicySatisfyManager.class); + private final StoragePolicySatisfier spsService; + private final boolean storagePolicyEnabled; + private volatile StoragePolicySatisfierMode mode; + private final Queue<Long> pathsToBeTraveresed; + private final int outstandingPathsLimit; + private final Namesystem namesystem; + private final BlockManager blkMgr; + + public StoragePolicySatisfyManager(Configuration conf, Namesystem namesystem, + BlockManager blkMgr) { + // StoragePolicySatisfier(SPS) configs + storagePolicyEnabled = conf.getBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT); + String modeVal = conf.get( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT); + outstandingPathsLimit = conf.getInt( + DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY, + DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT); + mode = StoragePolicySatisfierMode.fromString(modeVal); + pathsToBeTraveresed = new LinkedList<Long>(); + // instantiate SPS service by just keeps config reference and not starting + // any supporting threads. + spsService = new StoragePolicySatisfier(conf); + this.namesystem = namesystem; + this.blkMgr = blkMgr; + } + + /** + * This function will do following logic based on the configured sps mode: + * + * <p> + * If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then + * starts internal daemon service inside namenode. + * + * <p> + * If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then + * it won't do anything. Administrator requires to start external sps service + * explicitly. + * + * <p> + * If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then the + * service is disabled and won't do any action. + */ + public void start() { + if (!storagePolicyEnabled) { + LOG.info("Disabling StoragePolicySatisfier service as {} set to {}.", + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled); + return; + } + + switch (mode) { + case INTERNAL: + if (spsService.isRunning()) { + LOG.info("Storage policy satisfier is already running" + + " as internal daemon service inside namenode."); + return; + } + // starts internal daemon service inside namenode + spsService.init( + new IntraSPSNameNodeContext(namesystem, blkMgr, spsService), + new IntraSPSNameNodeFileIdCollector(namesystem.getFSDirectory(), + spsService), + new IntraSPSNameNodeBlockMoveTaskHandler(blkMgr, namesystem), null); + spsService.start(false, mode); + break; + case EXTERNAL: + LOG.info("Storage policy satisfier is configured as external, " + + "please start external sps service explicitly to satisfy policy"); + break; + case NONE: + LOG.info("Storage policy satisfier is disabled"); + break; + default: + LOG.info("Given mode: {} is invalid", mode); + break; + } + } + + /** + * This function will do following logic based on the configured sps mode: + * + * <p> + * If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then + * stops internal daemon service inside namenode. + * + * <p> + * If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then + * it won't do anything. Administrator requires to stop external sps service + * explicitly, if needed. + * + * <p> + * If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then the + * service is disabled and won't do any action. + */ + public void stop() { + if (!storagePolicyEnabled) { + if (LOG.isDebugEnabled()) { + LOG.debug("Storage policy is not enabled, ignoring"); + } + return; + } + + switch (mode) { + case INTERNAL: + removeAllPathIds(); + if (!spsService.isRunning()) { + LOG.info("Internal storage policy satisfier daemon service" + + " is not running"); + return; + } + // stops internal daemon service running inside namenode + spsService.stop(false); + break; + case EXTERNAL: + removeAllPathIds(); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Storage policy satisfier service is running outside namenode" + + ", ignoring"); + } + break; + case NONE: + if (LOG.isDebugEnabled()) { + LOG.debug("Storage policy satisfier is not enabled, ignoring"); + } + break; + default: + if (LOG.isDebugEnabled()) { + LOG.debug("Invalid mode:{}, ignoring", mode); + } + break; + } + } + + /** + * Sets new sps mode. If the new mode is internal, then it will start internal + * sps service inside namenode. If the new mode is external, then stops + * internal sps service running(if any) inside namenode. If the new mode is + * none, then it will disable the sps feature completely by clearing all + * queued up sps path's hint. + */ + public void changeModeEvent(StoragePolicySatisfierMode newMode) { + if (!storagePolicyEnabled) { + LOG.info("Failed to change storage policy satisfier as {} set to {}.", + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled); + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Updating SPS service status, current mode:{}, new mode:{}", + mode, newMode); + } + + switch (newMode) { + case INTERNAL: + if (spsService.isRunning()) { + LOG.info("Storage policy satisfier is already running as {} mode.", + mode); + return; + } + spsService.init( + new IntraSPSNameNodeContext(this.namesystem, this.blkMgr, spsService), + new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(), + spsService), + new IntraSPSNameNodeBlockMoveTaskHandler(this.blkMgr, + this.namesystem), + null); + spsService.start(true, newMode); + break; + case EXTERNAL: + if (mode == newMode) { + LOG.info("Storage policy satisfier is already in mode:{}," + + " so ignoring change mode event.", newMode); + return; + } + spsService.stopGracefully(); + break; + case NONE: + if (mode == newMode) { + LOG.info("Storage policy satisfier is already disabled, mode:{}" + + " so ignoring change mode event.", newMode); + return; + } + LOG.info("Disabling StoragePolicySatisfier, mode:{}", newMode); + spsService.stop(true); + removeAllPathIds(); + break; + default: + if (LOG.isDebugEnabled()) { + LOG.debug("Given mode: {} is invalid", newMode); + } + break; + } + + // update sps mode + mode = newMode; + } + + /** + * This function will do following logic based on the configured sps mode: + * + * <p> + * If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then + * timed wait to stop internal storage policy satisfier daemon threads. + * + * <p> + * If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then + * it won't do anything, just ignore it. + * + * <p> + * If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then the + * service is disabled. It won't do any action, just ignore it. + */ + public void stopGracefully() { + switch (mode) { + case INTERNAL: + spsService.stopGracefully(); + break; + case EXTERNAL: + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring, StoragePolicySatisfier feature is running" + + " outside namenode"); + } + break; + case NONE: + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring, StoragePolicySatisfier feature is disabled"); + } + break; + default: + if (LOG.isDebugEnabled()) { + LOG.debug("Invalid mode:{}", mode); + } + break; + } + } + + /** + * @return true if the internal storage policy satisfier daemon is running, + * false otherwise. + */ + public boolean isInternalSatisfierRunning() { + return spsService.isRunning(); + } + + /** + * @return internal SPS service instance. + */ + public SPSService getInternalSPSService() { + return this.spsService; + } + + /** + * @return status Storage policy satisfy status of the path. It is supported + * only for the internal sps daemon service. + * @throws IOException + * if the Satisfier is not running inside namenode. + */ + public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus( + String path) throws IOException { + if (mode != StoragePolicySatisfierMode.INTERNAL) { + LOG.debug("Satisfier is not running inside namenode, so status " + + "can't be returned."); + throw new IOException("Satisfier is not running inside namenode, " + + "so status can't be returned."); + } + return spsService.checkStoragePolicySatisfyPathStatus(path); + } + + /** + * @return the next SPS path id, on which path users has invoked to satisfy + * storages. + */ + public Long getNextPathId() { + synchronized (pathsToBeTraveresed) { + return pathsToBeTraveresed.poll(); + } + } + + /** + * Verify that satisfier queue limit exceeds allowed outstanding limit. + */ + public void verifyOutstandingPathQLimit() throws IOException { + long size = pathsToBeTraveresed.size(); + // Checking that the SPS call Q exceeds the allowed limit. + if (outstandingPathsLimit - size <= 0) { + LOG.debug("Satisifer Q - outstanding limit:{}, current size:{}", + outstandingPathsLimit, size); + throw new IOException("Outstanding satisfier queue limit: " + + outstandingPathsLimit + " exceeded, try later!"); + } + } + + /** + * Removes the SPS path id from the list of sps paths. + */ + public void removePathId(long trackId) { + synchronized (pathsToBeTraveresed) { + pathsToBeTraveresed.remove(trackId); + } + } + + /** + * Clean up all sps path ids. + */ + public void removeAllPathIds() { + synchronized (pathsToBeTraveresed) { + pathsToBeTraveresed.clear(); + } + } + + /** + * Adds the sps path to SPSPathIds list. + */ + public void addPathId(long id) { + synchronized (pathsToBeTraveresed) { + pathsToBeTraveresed.add(id); + } + } + + /** + * @return true if sps is configured as an internal service or external + * service, false otherwise. + */ + public boolean isEnabled() { + return mode == StoragePolicySatisfierMode.INTERNAL + || mode == StoragePolicySatisfierMode.EXTERNAL; + } + + /** + * @return sps service mode. + */ + public StoragePolicySatisfierMode getMode() { + return mode; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java index 59935b6..33448db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java @@ -73,7 +73,7 @@ public final class ExternalStoragePolicySatisfier { boolean spsRunning; spsRunning = nnc.getDistributedFileSystem().getClient() - .isStoragePolicySatisfierRunning(); + .isInternalSatisfierRunning(); if (spsRunning) { throw new RuntimeException( "Startup failed due to StoragePolicySatisfier" http://git-wip-us.apache.org/repos/asf/hadoop/blob/87e125a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java index 3a2ad48..d8392fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java @@ -374,7 +374,7 @@ public class StoragePolicyAdmin extends Configured implements Tool { } final DistributedFileSystem dfs = AdminHelper.getDFS(conf); try { - if(dfs.getClient().isStoragePolicySatisfierRunning()){ + if(dfs.getClient().isInternalSatisfierRunning()){ System.out.println("yes"); }else{ System.out.println("no"); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org