HDFS-12570: [SPS]: Refactor Co-ordinator datanode logic to track the block storage movements. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2666d51f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2666d51f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2666d51f Branch: refs/heads/HDFS-10285 Commit: 2666d51fb86201fd3c64d8de52ef39c73c93c203 Parents: a275162 Author: Uma Maheswara Rao G <[email protected]> Authored: Thu Oct 12 17:17:51 2017 -0700 Committer: Uma Maheswara Rao G <[email protected]> Committed: Thu Oct 12 17:17:51 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 54 +-- .../DatanodeProtocolClientSideTranslatorPB.java | 12 +- .../DatanodeProtocolServerSideTranslatorPB.java | 4 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 150 +++----- .../blockmanagement/DatanodeDescriptor.java | 50 ++- .../server/blockmanagement/DatanodeManager.java | 104 ++++-- .../hdfs/server/datanode/BPOfferService.java | 3 +- .../hdfs/server/datanode/BPServiceActor.java | 33 +- .../datanode/BlockStorageMovementTracker.java | 80 ++--- .../datanode/StoragePolicySatisfyWorker.java | 214 ++++-------- .../BlockStorageMovementAttemptedItems.java | 299 ++++------------ .../BlockStorageMovementInfosBatch.java | 61 ---- .../hdfs/server/namenode/FSNamesystem.java | 11 +- .../hdfs/server/namenode/NameNodeRpcServer.java | 7 +- .../server/namenode/StoragePolicySatisfier.java | 343 ++++++++++--------- .../protocol/BlockStorageMovementCommand.java | 99 ++---- .../BlocksStorageMoveAttemptFinished.java | 48 +++ .../protocol/BlocksStorageMovementResult.java | 74 ---- .../hdfs/server/protocol/DatanodeProtocol.java | 5 +- .../src/main/proto/DatanodeProtocol.proto | 30 +- .../src/main/resources/hdfs-default.xml | 21 +- .../src/site/markdown/ArchivalStorage.md | 6 +- .../TestNameNodePrunesMissingStorages.java | 5 +- .../datanode/InternalDataNodeTestUtils.java | 4 +- .../server/datanode/TestBPOfferService.java | 4 +- .../hdfs/server/datanode/TestBlockRecovery.java | 4 +- .../server/datanode/TestDataNodeLifeline.java | 6 +- .../TestDatanodeProtocolRetryPolicy.java | 4 +- .../server/datanode/TestFsDatasetCache.java | 4 +- .../TestStoragePolicySatisfyWorker.java | 52 ++- .../hdfs/server/datanode/TestStorageReport.java | 4 +- .../server/namenode/NNThroughputBenchmark.java | 6 +- .../hdfs/server/namenode/NameNodeAdapter.java | 4 +- .../TestBlockStorageMovementAttemptedItems.java | 145 ++++---- .../hdfs/server/namenode/TestDeadDatanode.java | 4 +- .../namenode/TestStoragePolicySatisfier.java | 115 ++++++- ...stStoragePolicySatisfierWithStripedFile.java | 20 +- 37 files changed, 931 insertions(+), 1158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index e74c90c..0686e83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFau import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; import org.apache.hadoop.http.HttpConfig; -/** +/** * This class contains constants for configuration keys and default values * used in hdfs. */ @@ -306,7 +306,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC = "dfs.namenode.lazypersist.file.scrub.interval.sec"; public static final int DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT = 5 * 60; - + public static final String DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = "dfs.namenode.edits.noeditlogchannelflush"; public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false; @@ -366,11 +366,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // The default value of the time interval for marking datanodes as stale public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval"; public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s - // The stale interval cannot be too small since otherwise this may cause too frequent churn on stale states. - // This value uses the times of heartbeat interval to define the minimum value for stale interval. + // The stale interval cannot be too small since otherwise this may cause too frequent churn on stale states. + // This value uses the times of heartbeat interval to define the minimum value for stale interval. public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval"; public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s - + // When the percentage of stale datanodes reaches this ratio, // allow writing to stale nodes to prevent hotspots. public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio"; @@ -556,11 +556,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY = "dfs.storage.policy.satisfier.recheck.timeout.millis"; public static final int DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT = - 5 * 60 * 1000; + 1 * 60 * 1000; public static final String DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY = "dfs.storage.policy.satisfier.self.retry.timeout.millis"; public static final int DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT = - 20 * 60 * 1000; + 5 * 60 * 1000; + public static final String DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY = + "dfs.storage.policy.satisfier.low.max-streams.preference"; + public static final boolean DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_DEFAULT = + false; public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address"; public static final int DFS_DATANODE_DEFAULT_PORT = 9866; @@ -930,7 +934,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_JOURNALNODE_RPC_ADDRESS_KEY = "dfs.journalnode.rpc-address"; public static final int DFS_JOURNALNODE_RPC_PORT_DEFAULT = 8485; public static final String DFS_JOURNALNODE_RPC_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_RPC_PORT_DEFAULT; - + public static final String DFS_JOURNALNODE_HTTP_ADDRESS_KEY = "dfs.journalnode.http-address"; public static final int DFS_JOURNALNODE_HTTP_PORT_DEFAULT = 8480; public static final String DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_HTTP_PORT_DEFAULT; @@ -951,7 +955,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // Journal-node related configs for the client side. public static final String DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb"; public static final int DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT = 10; - + // Quorum-journal timeouts for various operations. Unlikely to need // to be tweaked, but configurable just in case. public static final String DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.start-segment.timeout.ms"; @@ -970,17 +974,17 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000; public static final int DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000; public static final int DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000; - + public static final String DFS_MAX_NUM_BLOCKS_TO_LOG_KEY = "dfs.namenode.max-num-blocks-to-log"; public static final long DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT = 1000l; - + public static final String DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY = "dfs.namenode.enable.retrycache"; public static final boolean DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT = true; public static final String DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY = "dfs.namenode.retrycache.expirytime.millis"; public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent"; public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f; - + // Hidden configuration undocumented in hdfs-site. xml // Timeout to wait for block receiver and responder thread to stop public static final String DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY = "dfs.datanode.xceiver.stop.timeout.millis"; @@ -1025,7 +1029,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { = HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_MAX_DEFAULT; // Handling unresolved DN topology mapping - public static final String DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY = + public static final String DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY = "dfs.namenode.reject-unresolved-dn-topology-mapping"; public static final boolean DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT = false; @@ -1125,13 +1129,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.use.dfs.network.topology"; public static final boolean DFS_USE_DFS_NETWORK_TOPOLOGY_DEFAULT = true; - // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY = HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY; @Deprecated public static final boolean DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT - = HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT; + = HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT; @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_SPEC_KEY = HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY; @@ -1163,7 +1167,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_CLIENT_RETRY_WINDOW_BASE_DEFAULT = HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT; - // dfs.client.failover confs are moved to HdfsClientConfigKeys.Failover + // dfs.client.failover confs are moved to HdfsClientConfigKeys.Failover @Deprecated public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX; @@ -1197,8 +1201,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { @Deprecated public static final int DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT; - - // dfs.client.write confs are moved to HdfsClientConfigKeys.Write + + // dfs.client.write confs are moved to HdfsClientConfigKeys.Write @Deprecated public static final String DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY = HdfsClientConfigKeys.Write.MAX_PACKETS_IN_FLIGHT_KEY; @@ -1236,7 +1240,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT = HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT; - // dfs.client.block.write confs are moved to HdfsClientConfigKeys.BlockWrite + // dfs.client.block.write confs are moved to HdfsClientConfigKeys.BlockWrite @Deprecated public static final String DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY = HdfsClientConfigKeys.BlockWrite.RETRIES_KEY; @@ -1274,13 +1278,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT = HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_DEFAULT; - // dfs.client.read confs are moved to HdfsClientConfigKeys.Read + // dfs.client.read confs are moved to HdfsClientConfigKeys.Read @Deprecated public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY - = HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY; + = HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY; @Deprecated public static final String DFS_CLIENT_READ_SHORTCIRCUIT_KEY - = HdfsClientConfigKeys.Read.ShortCircuit.KEY; + = HdfsClientConfigKeys.Read.ShortCircuit.KEY; @Deprecated public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT; @@ -1309,7 +1313,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT = HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_DEFAULT; - // dfs.client.mmap confs are moved to HdfsClientConfigKeys.Mmap + // dfs.client.mmap confs are moved to HdfsClientConfigKeys.Mmap @Deprecated public static final String DFS_CLIENT_MMAP_ENABLED = HdfsClientConfigKeys.Mmap.ENABLED_KEY; @@ -1335,7 +1339,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT = HdfsClientConfigKeys.Mmap.RETRY_TIMEOUT_MS_DEFAULT; - // dfs.client.short.circuit confs are moved to HdfsClientConfigKeys.ShortCircuit + // dfs.client.short.circuit confs are moved to HdfsClientConfigKeys.ShortCircuit @Deprecated public static final String DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS = HdfsClientConfigKeys.ShortCircuit.REPLICA_STALE_THRESHOLD_MS_KEY; @@ -1343,7 +1347,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT = HdfsClientConfigKeys.ShortCircuit.REPLICA_STALE_THRESHOLD_MS_DEFAULT; - // dfs.client.hedged.read confs are moved to HdfsClientConfigKeys.HedgedRead + // dfs.client.hedged.read confs are moved to HdfsClientConfigKeys.HedgedRead @Deprecated public static final String DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS = HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY; http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 9dd87d0..dcc0705 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlock import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -140,7 +140,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, @Nonnull SlowDiskReports slowDisks, - BlocksStorageMovementResult[] blksMovementResults) throws IOException { + BlocksStorageMoveAttemptFinished storageMovementFinishedBlks) + throws IOException { HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) @@ -165,8 +166,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements } // Adding blocks movement results to the heart beat request. - builder.addAllBlksMovementResults( - PBHelper.convertBlksMovResults(blksMovementResults)); + if (storageMovementFinishedBlks != null + && storageMovementFinishedBlks.getBlocks() != null) { + builder.setStorageMoveAttemptFinishedBlks( + PBHelper.convertBlksMovReport(storageMovementFinishedBlks)); + } HeartbeatResponseProto resp; try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index 40458ef..b5bb80a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -123,8 +123,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements volumeFailureSummary, request.getRequestFullBlockReportLease(), PBHelper.convertSlowPeerInfo(request.getSlowPeersList()), PBHelper.convertSlowDiskInfo(request.getSlowDisksList()), - PBHelper.convertBlksMovResults( - request.getBlksMovementResultsList())); + PBHelper.convertBlksMovReport( + request.getStorageMoveAttemptFinishedBlks())); } catch (IOException e) { throw new ServiceException(e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 5044c0b..d329f9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBand import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockMovingInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto; @@ -54,11 +55,11 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerRepo import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto; -import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementProto; -import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMovementResultProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMoveAttemptFinishedProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto; @@ -100,8 +101,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStr import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; @@ -967,59 +967,27 @@ public class PBHelper { return SlowDiskReports.create(slowDisksMap); } - public static BlocksStorageMovementResult[] convertBlksMovResults( - List<BlocksStorageMovementResultProto> protos) { - BlocksStorageMovementResult[] results = - new BlocksStorageMovementResult[protos.size()]; - for (int i = 0; i < protos.size(); i++) { - BlocksStorageMovementResultProto resultProto = protos.get(i); - BlocksStorageMovementResult.Status status; - switch (resultProto.getStatus()) { - case SUCCESS: - status = Status.SUCCESS; - break; - case FAILURE: - status = Status.FAILURE; - break; - case IN_PROGRESS: - status = Status.IN_PROGRESS; - break; - default: - throw new AssertionError("Unknown status: " + resultProto.getStatus()); - } - results[i] = new BlocksStorageMovementResult(resultProto.getTrackID(), - status); + public static BlocksStorageMoveAttemptFinished convertBlksMovReport( + BlocksStorageMoveAttemptFinishedProto proto) { + + List<BlockProto> blocksList = proto.getBlocksList(); + Block[] blocks = new Block[blocksList.size()]; + for (int i = 0; i < blocksList.size(); i++) { + BlockProto blkProto = blocksList.get(i); + blocks[i] = PBHelperClient.convert(blkProto); } - return results; + return new BlocksStorageMoveAttemptFinished(blocks); } - public static List<BlocksStorageMovementResultProto> convertBlksMovResults( - BlocksStorageMovementResult[] blocksMovementResults) { - List<BlocksStorageMovementResultProto> blocksMovementResultsProto = - new ArrayList<>(); - BlocksStorageMovementResultProto.Builder builder = - BlocksStorageMovementResultProto.newBuilder(); - for (int i = 0; i < blocksMovementResults.length; i++) { - BlocksStorageMovementResult report = blocksMovementResults[i]; - builder.setTrackID(report.getTrackId()); - BlocksStorageMovementResultProto.Status status; - switch (report.getStatus()) { - case SUCCESS: - status = BlocksStorageMovementResultProto.Status.SUCCESS; - break; - case FAILURE: - status = BlocksStorageMovementResultProto.Status.FAILURE; - break; - case IN_PROGRESS: - status = BlocksStorageMovementResultProto.Status.IN_PROGRESS; - break; - default: - throw new AssertionError("Unknown status: " + report.getStatus()); - } - builder.setStatus(status); - blocksMovementResultsProto.add(builder.build()); + public static BlocksStorageMoveAttemptFinishedProto convertBlksMovReport( + BlocksStorageMoveAttemptFinished blocksMoveAttemptFinished) { + BlocksStorageMoveAttemptFinishedProto.Builder builder = + BlocksStorageMoveAttemptFinishedProto.newBuilder(); + Block[] blocks = blocksMoveAttemptFinished.getBlocks(); + for (Block block : blocks) { + builder.addBlocks(PBHelperClient.convert(block)); } - return blocksMovementResultsProto; + return builder.build(); } public static JournalInfo convert(JournalInfoProto info) { @@ -1183,34 +1151,34 @@ public class PBHelper { BlockStorageMovementCommandProto.Builder builder = BlockStorageMovementCommandProto.newBuilder(); - builder.setTrackID(blkStorageMovementCmd.getTrackID()); builder.setBlockPoolId(blkStorageMovementCmd.getBlockPoolId()); Collection<BlockMovingInfo> blockMovingInfos = blkStorageMovementCmd .getBlockMovingTasks(); for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { - builder.addBlockStorageMovement( - convertBlockMovingInfo(blkMovingInfo)); + builder.addBlockMovingInfo(convertBlockMovingInfo(blkMovingInfo)); } return builder.build(); } - private static BlockStorageMovementProto convertBlockMovingInfo( + private static BlockMovingInfoProto convertBlockMovingInfo( BlockMovingInfo blkMovingInfo) { - BlockStorageMovementProto.Builder builder = BlockStorageMovementProto + BlockMovingInfoProto.Builder builder = BlockMovingInfoProto .newBuilder(); builder.setBlock(PBHelperClient.convert(blkMovingInfo.getBlock())); - DatanodeInfo[] sourceDnInfos = blkMovingInfo.getSources(); - builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos)); + DatanodeInfo sourceDnInfo = blkMovingInfo.getSource(); + builder.setSourceDnInfo(PBHelperClient.convert(sourceDnInfo)); - DatanodeInfo[] targetDnInfos = blkMovingInfo.getTargets(); - builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos)); + DatanodeInfo targetDnInfo = blkMovingInfo.getTarget(); + builder.setTargetDnInfo(PBHelperClient.convert(targetDnInfo)); - StorageType[] sourceStorageTypes = blkMovingInfo.getSourceStorageTypes(); - builder.setSourceStorageTypes(convertStorageTypesProto(sourceStorageTypes)); + StorageType sourceStorageType = blkMovingInfo.getSourceStorageType(); + builder.setSourceStorageType( + PBHelperClient.convertStorageType(sourceStorageType)); - StorageType[] targetStorageTypes = blkMovingInfo.getTargetStorageTypes(); - builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes)); + StorageType targetStorageType = blkMovingInfo.getTargetStorageType(); + builder.setTargetStorageType( + PBHelperClient.convertStorageType(targetStorageType)); return builder.build(); } @@ -1218,42 +1186,38 @@ public class PBHelper { private static DatanodeCommand convert( BlockStorageMovementCommandProto blkStorageMovementCmdProto) { Collection<BlockMovingInfo> blockMovingInfos = new ArrayList<>(); - List<BlockStorageMovementProto> blkSPSatisfyList = - blkStorageMovementCmdProto.getBlockStorageMovementList(); - for (BlockStorageMovementProto blkSPSatisfy : blkSPSatisfyList) { + List<BlockMovingInfoProto> blkSPSatisfyList = + blkStorageMovementCmdProto.getBlockMovingInfoList(); + for (BlockMovingInfoProto blkSPSatisfy : blkSPSatisfyList) { blockMovingInfos.add(convertBlockMovingInfo(blkSPSatisfy)); } return new BlockStorageMovementCommand( DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, - blkStorageMovementCmdProto.getTrackID(), blkStorageMovementCmdProto.getBlockPoolId(), blockMovingInfos); } private static BlockMovingInfo convertBlockMovingInfo( - BlockStorageMovementProto blockStoragePolicySatisfyProto) { - BlockProto blockProto = blockStoragePolicySatisfyProto.getBlock(); + BlockMovingInfoProto blockStorageMovingInfoProto) { + BlockProto blockProto = blockStorageMovingInfoProto.getBlock(); Block block = PBHelperClient.convert(blockProto); - DatanodeInfosProto sourceDnInfosProto = blockStoragePolicySatisfyProto - .getSourceDnInfos(); - DatanodeInfo[] sourceDnInfos = PBHelperClient.convert(sourceDnInfosProto); - - DatanodeInfosProto targetDnInfosProto = blockStoragePolicySatisfyProto - .getTargetDnInfos(); - DatanodeInfo[] targetDnInfos = PBHelperClient.convert(targetDnInfosProto); - - StorageTypesProto srcStorageTypesProto = blockStoragePolicySatisfyProto - .getSourceStorageTypes(); - StorageType[] srcStorageTypes = PBHelperClient.convertStorageTypes( - srcStorageTypesProto.getStorageTypesList(), - srcStorageTypesProto.getStorageTypesList().size()); - - StorageTypesProto targetStorageTypesProto = blockStoragePolicySatisfyProto - .getTargetStorageTypes(); - StorageType[] targetStorageTypes = PBHelperClient.convertStorageTypes( - targetStorageTypesProto.getStorageTypesList(), - targetStorageTypesProto.getStorageTypesList().size()); - return new BlockMovingInfo(block, sourceDnInfos, targetDnInfos, - srcStorageTypes, targetStorageTypes); + DatanodeInfoProto sourceDnInfoProto = blockStorageMovingInfoProto + .getSourceDnInfo(); + DatanodeInfo sourceDnInfo = PBHelperClient.convert(sourceDnInfoProto); + + DatanodeInfoProto targetDnInfoProto = blockStorageMovingInfoProto + .getTargetDnInfo(); + DatanodeInfo targetDnInfo = PBHelperClient.convert(targetDnInfoProto); + StorageTypeProto srcStorageTypeProto = blockStorageMovingInfoProto + .getSourceStorageType(); + StorageType srcStorageType = PBHelperClient + .convertStorageType(srcStorageTypeProto); + + StorageTypeProto targetStorageTypeProto = blockStorageMovingInfoProto + .getTargetStorageType(); + StorageType targetStorageType = PBHelperClient + .convertStorageType(targetStorageTypeProto); + return new BlockMovingInfo(block, sourceDnInfo, targetDnInfo, + srcStorageType, targetStorageType); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 95cb3a5..9210e59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementInfosBatch; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; @@ -211,7 +210,7 @@ public class DatanodeDescriptor extends DatanodeInfo { * A queue of blocks corresponding to trackID for moving its storage * placements by this datanode. */ - private final Queue<BlockStorageMovementInfosBatch> storageMovementBlocks = + private final Queue<BlockMovingInfo> storageMovementBlocks = new LinkedList<>(); private volatile boolean dropSPSWork = false; @@ -1029,30 +1028,45 @@ public class DatanodeDescriptor extends DatanodeInfo { /** * Add the block infos which needs to move its storage locations. * - * @param trackID - * - unique identifier which will be used for tracking the given set - * of blocks movement completion. - * @param storageMismatchedBlocks - * - storage mismatched block infos + * @param blkMovingInfo + * - storage mismatched block info */ - public void addBlocksToMoveStorage(long trackID, - List<BlockMovingInfo> storageMismatchedBlocks) { + public void addBlocksToMoveStorage(BlockMovingInfo blkMovingInfo) { synchronized (storageMovementBlocks) { - storageMovementBlocks.offer( - new BlockStorageMovementInfosBatch(trackID, storageMismatchedBlocks)); + storageMovementBlocks.offer(blkMovingInfo); } } /** - * @return block infos which needs to move its storage locations. This returns - * list of blocks under one trackId. + * Return the number of blocks queued up for movement. */ - public BlockStorageMovementInfosBatch getBlocksToMoveStorages() { + public int getNumberOfBlocksToMoveStorages() { + return storageMovementBlocks.size(); + } + + /** + * Get the blocks to move to satisfy the storage media type. + * + * @param numBlocksToMoveTasks + * total number of blocks which will be send to this datanode for + * block movement. + * + * @return block infos which needs to move its storage locations. + */ + public BlockMovingInfo[] getBlocksToMoveStorages(int numBlocksToMoveTasks) { synchronized (storageMovementBlocks) { - // TODO: Presently returning the list of blocks under one trackId. - // Need to limit the list of items into small batches with in trackId - // itself if blocks are many(For example: a file contains many blocks). - return storageMovementBlocks.poll(); + List<BlockMovingInfo> blockMovingInfos = new ArrayList<>(); + for (; !storageMovementBlocks.isEmpty() + && numBlocksToMoveTasks > 0; numBlocksToMoveTasks--) { + blockMovingInfos.add(storageMovementBlocks.poll()); + } + BlockMovingInfo[] blkMoveArray = new BlockMovingInfo[blockMovingInfos + .size()]; + blkMoveArray = blockMovingInfos.toArray(blkMoveArray); + if (blkMoveArray.length > 0) { + return blkMoveArray; + } + return null; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 3504cb0..4ea41d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList; import org.apache.hadoop.hdfs.server.common.Util; -import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementInfosBatch; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; @@ -48,6 +47,7 @@ import org.apache.hadoop.hdfs.server.protocol.*; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.*; import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; @@ -208,6 +208,8 @@ public class DatanodeManager { */ private final long timeBetweenResendingCachingDirectivesMs; + private final boolean blocksToMoveShareEqualRatio; + DatanodeManager(final BlockManager blockManager, final Namesystem namesystem, final Configuration conf) throws IOException { this.namesystem = namesystem; @@ -332,6 +334,12 @@ public class DatanodeManager { this.blocksPerPostponedMisreplicatedBlocksRescan = conf.getLong( DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY, DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT); + + // SPS configuration to decide blocks to move can share equal ratio of + // maxtransfers with pending replica and erasure-coded reconstruction tasks + blocksToMoveShareEqualRatio = conf.getBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_DEFAULT); } private static long getStaleIntervalFromConf(Configuration conf, @@ -1092,13 +1100,14 @@ public class DatanodeManager { // Sets dropSPSWork flag to true, to ensure that // DNA_DROP_SPS_WORK_COMMAND will send to datanode via next heartbeat // response immediately after the node registration. This is - // to avoid a situation, where multiple trackId responses coming from - // different co-odinator datanodes. After SPS monitor time out, it - // will retry the files which were scheduled to the disconnected(for - // long time more than heartbeat expiry) DN, by finding new - // co-ordinator datanode. Now, if the expired datanode reconnects back - // after SPS reschedules, it leads to get different movement results - // from reconnected and new DN co-ordinators. + // to avoid a situation, where multiple block attempt finished + // responses coming from different datanodes. After SPS monitor time + // out, it will retry the files which were scheduled to the + // disconnected(for long time more than heartbeat expiry) DN, by + // finding new datanode. Now, if the expired datanode reconnects back + // after SPS reschedules, it leads to get different movement attempt + // finished report from reconnected and newly datanode which is + // attempting the block movement. nodeS.setDropSPSWork(true); // resolve network location @@ -1678,19 +1687,47 @@ public class DatanodeManager { final List<DatanodeCommand> cmds = new ArrayList<>(); // Allocate _approximately_ maxTransfers pending tasks to DataNode. // NN chooses pending tasks based on the ratio between the lengths of - // replication and erasure-coded block queues. + // replication, erasure-coded block queues and block storage movement + // queues. int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks(); int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded(); + int totalBlocksToMove = nodeinfo.getNumberOfBlocksToMoveStorages(); int totalBlocks = totalReplicateBlocks + totalECBlocks; - if (totalBlocks > 0) { - int numReplicationTasks = (int) Math.ceil( - (double) (totalReplicateBlocks * maxTransfers) / totalBlocks); - int numECTasks = (int) Math.ceil( - (double) (totalECBlocks * maxTransfers) / totalBlocks); - + if (totalBlocks > 0 || totalBlocksToMove > 0) { + int numReplicationTasks = 0; + int numECTasks = 0; + int numBlocksToMoveTasks = 0; + // Check blocksToMoveShareEqualRatio configuration is true/false. If true, + // then equally sharing the max transfer. Otherwise gives high priority to + // the pending_replica/erasure-coded tasks and only the delta streams will + // be used for blocks to move tasks. + if (blocksToMoveShareEqualRatio) { + // add blocksToMove count to total blocks so that will get equal share + totalBlocks = totalBlocks + totalBlocksToMove; + numReplicationTasks = (int) Math + .ceil((double) (totalReplicateBlocks * maxTransfers) / totalBlocks); + numECTasks = (int) Math + .ceil((double) (totalECBlocks * maxTransfers) / totalBlocks); + numBlocksToMoveTasks = (int) Math + .ceil((double) (totalBlocksToMove * maxTransfers) / totalBlocks); + } else { + // Calculate the replica and ec tasks, then pick blocksToMove if there + // is any streams available. + numReplicationTasks = (int) Math + .ceil((double) (totalReplicateBlocks * maxTransfers) / totalBlocks); + numECTasks = (int) Math + .ceil((double) (totalECBlocks * maxTransfers) / totalBlocks); + int numTasks = numReplicationTasks + numECTasks; + if (numTasks < maxTransfers) { + int remainingMaxTransfers = maxTransfers - numTasks; + numBlocksToMoveTasks = Math.min(totalBlocksToMove, + remainingMaxTransfers); + } + } if (LOG.isDebugEnabled()) { LOG.debug("Pending replication tasks: " + numReplicationTasks - + " erasure-coded tasks: " + numECTasks); + + " erasure-coded tasks: " + numECTasks + " blocks to move tasks: " + + numBlocksToMoveTasks); } // check pending replication tasks List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand( @@ -1706,6 +1743,23 @@ public class DatanodeManager { cmds.add(new BlockECReconstructionCommand( DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList)); } + // check pending block storage movement tasks + if (nodeinfo.shouldDropSPSWork()) { + cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND); + // Set back to false to indicate that the new value has been sent to the + // datanode. + nodeinfo.setDropSPSWork(false); + } else { + // Get pending block storage movement tasks + BlockMovingInfo[] blkStorageMovementInfos = nodeinfo + .getBlocksToMoveStorages(numBlocksToMoveTasks); + + if (blkStorageMovementInfos != null) { + cmds.add(new BlockStorageMovementCommand( + DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, blockPoolId, + Arrays.asList(blkStorageMovementInfos))); + } + } } // check block invalidation @@ -1749,24 +1803,6 @@ public class DatanodeManager { } } - if (nodeinfo.shouldDropSPSWork()) { - cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND); - // Set back to false to indicate that the new value has been sent to the - // datanode. - nodeinfo.setDropSPSWork(false); - } - - // check pending block storage movement tasks - BlockStorageMovementInfosBatch blkStorageMovementInfosBatch = nodeinfo - .getBlocksToMoveStorages(); - - if (blkStorageMovementInfosBatch != null) { - cmds.add(new BlockStorageMovementCommand( - DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, - blkStorageMovementInfosBatch.getTrackID(), blockPoolId, - blkStorageMovementInfosBatch.getBlockMovingInfo())); - } - if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index d60fb6d..65ab246 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -785,8 +785,7 @@ class BPOfferService { LOG.info("DatanodeCommand action: DNA_BLOCK_STORAGE_MOVEMENT"); BlockStorageMovementCommand blkSPSCmd = (BlockStorageMovementCommand) cmd; dn.getStoragePolicySatisfyWorker().processBlockMovingTasks( - blkSPSCmd.getTrackID(), blkSPSCmd.getBlockPoolId(), - blkSPSCmd.getBlockMovingTasks()); + blkSPSCmd.getBlockPoolId(), blkSPSCmd.getBlockMovingTasks()); break; case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND: LOG.info("DatanodeCommand action: DNA_DROP_SPS_WORK_COMMAND"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index f537f49..b7beda4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.client.BlockReportOptions; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -50,7 +51,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -513,8 +514,11 @@ class BPServiceActor implements Runnable { SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) : SlowDiskReports.EMPTY_REPORT; - BlocksStorageMovementResult[] blksMovementResults = - getBlocksMovementResults(); + // Get the blocks storage move attempt finished blocks + List<Block> results = dn.getStoragePolicySatisfyWorker() + .getBlocksMovementsStatusHandler().getMoveAttemptFinishedBlocks(); + BlocksStorageMoveAttemptFinished storageMoveAttemptFinishedBlks = + getStorageMoveAttemptFinishedBlocks(results); HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration, reports, @@ -527,7 +531,7 @@ class BPServiceActor implements Runnable { requestBlockReportLease, slowPeers, slowDisks, - blksMovementResults); + storageMoveAttemptFinishedBlks); if (outliersReportDue) { // If the report was due and successfully sent, schedule the next one. @@ -537,20 +541,23 @@ class BPServiceActor implements Runnable { // Remove the blocks movement results after successfully transferring // to namenode. dn.getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler() - .remove(blksMovementResults); + .remove(results); return response; } - private BlocksStorageMovementResult[] getBlocksMovementResults() { - List<BlocksStorageMovementResult> trackIdVsMovementStatus = dn - .getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler() - .getBlksMovementResults(); - BlocksStorageMovementResult[] blksMovementResult = - new BlocksStorageMovementResult[trackIdVsMovementStatus.size()]; - trackIdVsMovementStatus.toArray(blksMovementResult); + private BlocksStorageMoveAttemptFinished getStorageMoveAttemptFinishedBlocks( + List<Block> finishedBlks) { - return blksMovementResult; + if (finishedBlks.isEmpty()) { + return null; + } + + // Create BlocksStorageMoveAttemptFinished with currently finished + // blocks + Block[] blockList = new Block[finishedBlks.size()]; + finishedBlks.toArray(blockList); + return new BlocksStorageMoveAttemptFinished(blockList); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java index f3d2bb6..b3b9fd9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java @@ -21,14 +21,14 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementAttemptFinished; import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsStatusHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,12 +41,12 @@ import org.slf4j.LoggerFactory; public class BlockStorageMovementTracker implements Runnable { private static final Logger LOG = LoggerFactory .getLogger(BlockStorageMovementTracker.class); - private final CompletionService<BlockMovementResult> moverCompletionService; + private final CompletionService<BlockMovementAttemptFinished> moverCompletionService; private final BlocksMovementsStatusHandler blksMovementsStatusHandler; - // Keeps the information - trackID vs its list of blocks - private final Map<Long, List<Future<BlockMovementResult>>> moverTaskFutures; - private final Map<Long, List<BlockMovementResult>> movementResults; + // Keeps the information - block vs its list of future move tasks + private final Map<Block, List<Future<BlockMovementAttemptFinished>>> moverTaskFutures; + private final Map<Block, List<BlockMovementAttemptFinished>> movementResults; private volatile boolean running = true; @@ -59,7 +59,7 @@ public class BlockStorageMovementTracker implements Runnable { * blocks movements status handler */ public BlockStorageMovementTracker( - CompletionService<BlockMovementResult> moverCompletionService, + CompletionService<BlockMovementAttemptFinished> moverCompletionService, BlocksMovementsStatusHandler handler) { this.moverCompletionService = moverCompletionService; this.moverTaskFutures = new HashMap<>(); @@ -82,32 +82,33 @@ public class BlockStorageMovementTracker implements Runnable { } } try { - Future<BlockMovementResult> future = moverCompletionService.take(); + Future<BlockMovementAttemptFinished> future = + moverCompletionService.take(); if (future != null) { - BlockMovementResult result = future.get(); + BlockMovementAttemptFinished result = future.get(); LOG.debug("Completed block movement. {}", result); - long trackId = result.getTrackId(); - List<Future<BlockMovementResult>> blocksMoving = moverTaskFutures - .get(trackId); + Block block = result.getBlock(); + List<Future<BlockMovementAttemptFinished>> blocksMoving = + moverTaskFutures.get(block); if (blocksMoving == null) { - LOG.warn("Future task doesn't exist for trackId " + trackId); + LOG.warn("Future task doesn't exist for block : {} ", block); continue; } blocksMoving.remove(future); - List<BlockMovementResult> resultPerTrackIdList = - addMovementResultToTrackIdList(result); + List<BlockMovementAttemptFinished> resultPerTrackIdList = + addMovementResultToBlockIdList(result); // Completed all the scheduled blocks movement under this 'trackId'. - if (blocksMoving.isEmpty() || moverTaskFutures.get(trackId) == null) { + if (blocksMoving.isEmpty() || moverTaskFutures.get(block) == null) { synchronized (moverTaskFutures) { - moverTaskFutures.remove(trackId); + moverTaskFutures.remove(block); } if (running) { // handle completed or inprogress blocks movements per trackId. blksMovementsStatusHandler.handle(resultPerTrackIdList); } - movementResults.remove(trackId); + movementResults.remove(block); } } } catch (InterruptedException e) { @@ -123,38 +124,39 @@ public class BlockStorageMovementTracker implements Runnable { } } - private List<BlockMovementResult> addMovementResultToTrackIdList( - BlockMovementResult result) { - long trackId = result.getTrackId(); - List<BlockMovementResult> perTrackIdList; + private List<BlockMovementAttemptFinished> addMovementResultToBlockIdList( + BlockMovementAttemptFinished result) { + Block block = result.getBlock(); + List<BlockMovementAttemptFinished> perBlockIdList; synchronized (movementResults) { - perTrackIdList = movementResults.get(trackId); - if (perTrackIdList == null) { - perTrackIdList = new ArrayList<>(); - movementResults.put(trackId, perTrackIdList); + perBlockIdList = movementResults.get(block); + if (perBlockIdList == null) { + perBlockIdList = new ArrayList<>(); + movementResults.put(block, perBlockIdList); } - perTrackIdList.add(result); + perBlockIdList.add(result); } - return perTrackIdList; + return perBlockIdList; } /** * Add future task to the tracking list to check the completion status of the * block movement. * - * @param trackID - * tracking Id + * @param blockID + * block identifier * @param futureTask * future task used for moving the respective block */ - void addBlock(long trackID, Future<BlockMovementResult> futureTask) { + void addBlock(Block block, + Future<BlockMovementAttemptFinished> futureTask) { synchronized (moverTaskFutures) { - List<Future<BlockMovementResult>> futures = moverTaskFutures - .get(Long.valueOf(trackID)); + List<Future<BlockMovementAttemptFinished>> futures = + moverTaskFutures.get(block); // null for the first task if (futures == null) { futures = new ArrayList<>(); - moverTaskFutures.put(trackID, futures); + moverTaskFutures.put(block, futures); } futures.add(futureTask); // Notify waiting tracker thread about the newly added tasks. @@ -175,16 +177,6 @@ public class BlockStorageMovementTracker implements Runnable { } /** - * @return the list of trackIds which are still waiting to complete all the - * scheduled blocks movements. - */ - Set<Long> getInProgressTrackIds() { - synchronized (moverTaskFutures) { - return moverTaskFutures.keySet(); - } - } - - /** * Sets running flag to false and clear the pending movement result queues. */ public void stopTracking() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java index 4e57805..47318f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; -import static org.apache.hadoop.util.Time.monotonicNow; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -32,9 +31,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; @@ -62,7 +59,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; @@ -89,14 +85,11 @@ public class StoragePolicySatisfyWorker { private final int moverThreads; private final ExecutorService moveExecutor; - private final CompletionService<BlockMovementResult> moverCompletionService; + private final CompletionService<BlockMovementAttemptFinished> moverCompletionService; private final BlocksMovementsStatusHandler handler; private final BlockStorageMovementTracker movementTracker; private Daemon movementTrackerThread; - private long inprogressTrackIdsCheckInterval = 30 * 1000; // 30seconds. - private long nextInprogressRecheckTime; - public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) { this.datanode = datanode; this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); @@ -111,16 +104,6 @@ public class StoragePolicySatisfyWorker { movementTrackerThread = new Daemon(movementTracker); movementTrackerThread.setName("BlockStorageMovementTracker"); - // Interval to check that the inprogress trackIds. The time interval is - // proportional o the heart beat interval time period. - final long heartbeatIntervalSeconds = conf.getTimeDuration( - DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, - DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS); - inprogressTrackIdsCheckInterval = 5 * heartbeatIntervalSeconds; - // update first inprogress recheck time to a future time stamp. - nextInprogressRecheckTime = monotonicNow() - + inprogressTrackIdsCheckInterval; - // TODO: Needs to manage the number of concurrent moves per DataNode. } @@ -186,30 +169,26 @@ public class StoragePolicySatisfyWorker { * separate thread. Each task will move the block replica to the target node & * wait for the completion. * - * @param trackID - * unique tracking identifier - * @param blockPoolID - * block pool ID + * @param blockPoolID block pool identifier + * * @param blockMovingInfos * list of blocks to be moved */ - public void processBlockMovingTasks(long trackID, String blockPoolID, - Collection<BlockMovingInfo> blockMovingInfos) { + public void processBlockMovingTasks(final String blockPoolID, + final Collection<BlockMovingInfo> blockMovingInfos) { LOG.debug("Received BlockMovingTasks {}", blockMovingInfos); for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { - assert blkMovingInfo.getSources().length == blkMovingInfo - .getTargets().length; - for (int i = 0; i < blkMovingInfo.getSources().length; i++) { - DatanodeInfo target = blkMovingInfo.getTargets()[i]; - BlockMovingTask blockMovingTask = new BlockMovingTask( - trackID, blockPoolID, blkMovingInfo.getBlock(), - blkMovingInfo.getSources()[i], target, - blkMovingInfo.getSourceStorageTypes()[i], - blkMovingInfo.getTargetStorageTypes()[i]); - Future<BlockMovementResult> moveCallable = moverCompletionService - .submit(blockMovingTask); - movementTracker.addBlock(trackID, moveCallable); - } + StorageType sourceStorageType = blkMovingInfo.getSourceStorageType(); + StorageType targetStorageType = blkMovingInfo.getTargetStorageType(); + assert sourceStorageType != targetStorageType + : "Source and Target storage type shouldn't be same!"; + BlockMovingTask blockMovingTask = new BlockMovingTask(blockPoolID, + blkMovingInfo.getBlock(), blkMovingInfo.getSource(), + blkMovingInfo.getTarget(), sourceStorageType, targetStorageType); + Future<BlockMovementAttemptFinished> moveCallable = moverCompletionService + .submit(blockMovingTask); + movementTracker.addBlock(blkMovingInfo.getBlock(), + moveCallable); } } @@ -217,8 +196,7 @@ public class StoragePolicySatisfyWorker { * This class encapsulates the process of moving the block replica to the * given target and wait for the response. */ - private class BlockMovingTask implements Callable<BlockMovementResult> { - private final long trackID; + private class BlockMovingTask implements Callable<BlockMovementAttemptFinished> { private final String blockPoolID; private final Block block; private final DatanodeInfo source; @@ -226,10 +204,9 @@ public class StoragePolicySatisfyWorker { private final StorageType srcStorageType; private final StorageType targetStorageType; - BlockMovingTask(long trackID, String blockPoolID, Block block, + BlockMovingTask(String blockPoolID, Block block, DatanodeInfo source, DatanodeInfo target, StorageType srcStorageType, StorageType targetStorageType) { - this.trackID = trackID; this.blockPoolID = blockPoolID; this.block = block; this.source = source; @@ -239,23 +216,26 @@ public class StoragePolicySatisfyWorker { } @Override - public BlockMovementResult call() { + public BlockMovementAttemptFinished call() { BlockMovementStatus status = moveBlock(); - return new BlockMovementResult(trackID, block.getBlockId(), target, - status); + return new BlockMovementAttemptFinished(block, source, target, status); } private BlockMovementStatus moveBlock() { LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy " - + "storageType, sourceStoragetype:{} and destinStoragetype:{}", + + "storageType, sourceStoragetype:{} and destinStoragetype:{}", block, source, target, srcStorageType, targetStorageType); Socket sock = null; DataOutputStream out = null; DataInputStream in = null; try { + datanode.incrementXmitsInProgress(); + ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block); DNConf dnConf = datanode.getDnConf(); - String dnAddr = target.getXferAddr(dnConf.getConnectToDnViaHostname()); + + String dnAddr = datanode.getDatanodeId() + .getXferAddr(dnConf.getConnectToDnViaHostname()); sock = datanode.newSocket(); NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), dnConf.getSocketTimeout()); @@ -297,9 +277,10 @@ public class StoragePolicySatisfyWorker { LOG.warn( "Failed to move block:{} from src:{} to destin:{} to satisfy " + "storageType:{}", - block, source, target, targetStorageType, e); + block, source, target, targetStorageType, e); return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE; } finally { + datanode.decrementXmitsInProgress(); IOUtils.closeStream(out); IOUtils.closeStream(in); IOUtils.closeSocket(sock); @@ -357,29 +338,25 @@ public class StoragePolicySatisfyWorker { } /** - * This class represents result from a block movement task. This will have the + * This class represents status from a block movement task. This will have the * information of the task which was successful or failed due to errors. */ - static class BlockMovementResult { - private final long trackId; - private final long blockId; + static class BlockMovementAttemptFinished { + private final Block block; + private final DatanodeInfo src; private final DatanodeInfo target; private final BlockMovementStatus status; - BlockMovementResult(long trackId, long blockId, + BlockMovementAttemptFinished(Block block, DatanodeInfo src, DatanodeInfo target, BlockMovementStatus status) { - this.trackId = trackId; - this.blockId = blockId; + this.block = block; + this.src = src; this.target = target; this.status = status; } - long getTrackId() { - return trackId; - } - - long getBlockId() { - return blockId; + Block getBlock() { + return block; } BlockMovementStatus getStatus() { @@ -388,99 +365,79 @@ public class StoragePolicySatisfyWorker { @Override public String toString() { - return new StringBuilder().append("Block movement result(\n ") - .append("track id: ").append(trackId).append(" block id: ") - .append(blockId).append(" target node: ").append(target) + return new StringBuilder().append("Block movement attempt finished(\n ") + .append(" block : ") + .append(block).append(" src node: ").append(src) + .append(" target node: ").append(target) .append(" movement status: ").append(status).append(")").toString(); } } /** * Blocks movements status handler, which is used to collect details of the - * completed or inprogress list of block movements and this status(success or - * failure or inprogress) will be send to the namenode via heartbeat. + * completed block movements and it will send these attempted finished(with + * success or failure) blocks to the namenode via heartbeat. */ - class BlocksMovementsStatusHandler { - private final List<BlocksStorageMovementResult> trackIdVsMovementStatus = + public static class BlocksMovementsStatusHandler { + private final List<Block> blockIdVsMovementStatus = new ArrayList<>(); /** - * Collect all the block movement results. Later this will be send to - * namenode via heart beat. + * Collect all the storage movement attempt finished blocks. Later this will + * be send to namenode via heart beat. * - * @param results - * result of all the block movements per trackId + * @param moveAttemptFinishedBlks + * set of storage movement attempt finished blocks */ - void handle(List<BlockMovementResult> resultsPerTrackId) { - BlocksStorageMovementResult.Status status = - BlocksStorageMovementResult.Status.SUCCESS; - long trackId = -1; - for (BlockMovementResult blockMovementResult : resultsPerTrackId) { - trackId = blockMovementResult.getTrackId(); - if (blockMovementResult.status == - BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE) { - status = BlocksStorageMovementResult.Status.FAILURE; - // If any of the block movement is failed, then mark as failure so - // that namenode can take a decision to retry the blocks associated to - // the given trackId. - break; - } - } + void handle(List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) { + List<Block> blocks = new ArrayList<>(); - // Adding to the tracking results list. Later this will be send to + for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) { + blocks.add(item.getBlock()); + } + // Adding to the tracking report list. Later this will be send to // namenode via datanode heartbeat. - synchronized (trackIdVsMovementStatus) { - trackIdVsMovementStatus.add( - new BlocksStorageMovementResult(trackId, status)); + synchronized (blockIdVsMovementStatus) { + blockIdVsMovementStatus.addAll(blocks); } } /** - * @return unmodifiable list of blocks storage movement results. + * @return unmodifiable list of storage movement attempt finished blocks. */ - List<BlocksStorageMovementResult> getBlksMovementResults() { - List<BlocksStorageMovementResult> movementResults = new ArrayList<>(); - // 1. Adding all the completed trackids. - synchronized (trackIdVsMovementStatus) { - if (trackIdVsMovementStatus.size() > 0) { - movementResults = Collections - .unmodifiableList(trackIdVsMovementStatus); + List<Block> getMoveAttemptFinishedBlocks() { + List<Block> moveAttemptFinishedBlks = new ArrayList<>(); + // 1. Adding all the completed block ids. + synchronized (blockIdVsMovementStatus) { + if (blockIdVsMovementStatus.size() > 0) { + moveAttemptFinishedBlks = Collections + .unmodifiableList(blockIdVsMovementStatus); } } - // 2. Adding the in progress track ids after those which are completed. - Set<Long> inProgressTrackIds = getInProgressTrackIds(); - for (Long trackId : inProgressTrackIds) { - movementResults.add(new BlocksStorageMovementResult(trackId, - BlocksStorageMovementResult.Status.IN_PROGRESS)); - } - return movementResults; + return moveAttemptFinishedBlks; } /** - * Remove the blocks storage movement results. + * Remove the storage movement attempt finished blocks from the tracking + * list. * - * @param results - * set of blocks storage movement results + * @param moveAttemptFinishedBlks + * set of storage movement attempt finished blocks */ - void remove(BlocksStorageMovementResult[] results) { - if (results != null) { - synchronized (trackIdVsMovementStatus) { - for (BlocksStorageMovementResult blocksMovementResult : results) { - trackIdVsMovementStatus.remove(blocksMovementResult); - } - } + void remove(List<Block> moveAttemptFinishedBlks) { + if (moveAttemptFinishedBlks != null) { + blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks); } } /** - * Clear the trackID vs movement status tracking map. + * Clear the blockID vs movement status tracking map. */ void removeAll() { - synchronized (trackIdVsMovementStatus) { - trackIdVsMovementStatus.clear(); + synchronized (blockIdVsMovementStatus) { + blockIdVsMovementStatus.clear(); } } - } @VisibleForTesting @@ -498,23 +455,4 @@ public class StoragePolicySatisfyWorker { movementTracker.removeAll(); handler.removeAll(); } - - /** - * Gets list of trackids which are inprogress. Will do collection periodically - * on 'dfs.datanode.storage.policy.satisfier.worker.inprogress.recheck.time. - * millis' interval. - * - * @return collection of trackids which are inprogress - */ - private Set<Long> getInProgressTrackIds() { - Set<Long> trackIds = new HashSet<>(); - long now = monotonicNow(); - if (nextInprogressRecheckTime >= now) { - trackIds = movementTracker.getInProgressTrackIds(); - - // schedule next re-check interval - nextInprogressRecheckTime = now + inprogressTrackIdsCheckInterval; - } - return trackIds; - } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
