HDFS-12570: [SPS]: Refactor Co-ordinator datanode logic to track the block 
storage movements. Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2666d51f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2666d51f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2666d51f

Branch: refs/heads/HDFS-10285
Commit: 2666d51fb86201fd3c64d8de52ef39c73c93c203
Parents: a275162
Author: Uma Maheswara Rao G <[email protected]>
Authored: Thu Oct 12 17:17:51 2017 -0700
Committer: Uma Maheswara Rao G <[email protected]>
Committed: Thu Oct 12 17:17:51 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  54 +--
 .../DatanodeProtocolClientSideTranslatorPB.java |  12 +-
 .../DatanodeProtocolServerSideTranslatorPB.java |   4 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 150 +++-----
 .../blockmanagement/DatanodeDescriptor.java     |  50 ++-
 .../server/blockmanagement/DatanodeManager.java | 104 ++++--
 .../hdfs/server/datanode/BPOfferService.java    |   3 +-
 .../hdfs/server/datanode/BPServiceActor.java    |  33 +-
 .../datanode/BlockStorageMovementTracker.java   |  80 ++---
 .../datanode/StoragePolicySatisfyWorker.java    | 214 ++++--------
 .../BlockStorageMovementAttemptedItems.java     | 299 ++++------------
 .../BlockStorageMovementInfosBatch.java         |  61 ----
 .../hdfs/server/namenode/FSNamesystem.java      |  11 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |   7 +-
 .../server/namenode/StoragePolicySatisfier.java | 343 ++++++++++---------
 .../protocol/BlockStorageMovementCommand.java   |  99 ++----
 .../BlocksStorageMoveAttemptFinished.java       |  48 +++
 .../protocol/BlocksStorageMovementResult.java   |  74 ----
 .../hdfs/server/protocol/DatanodeProtocol.java  |   5 +-
 .../src/main/proto/DatanodeProtocol.proto       |  30 +-
 .../src/main/resources/hdfs-default.xml         |  21 +-
 .../src/site/markdown/ArchivalStorage.md        |   6 +-
 .../TestNameNodePrunesMissingStorages.java      |   5 +-
 .../datanode/InternalDataNodeTestUtils.java     |   4 +-
 .../server/datanode/TestBPOfferService.java     |   4 +-
 .../hdfs/server/datanode/TestBlockRecovery.java |   4 +-
 .../server/datanode/TestDataNodeLifeline.java   |   6 +-
 .../TestDatanodeProtocolRetryPolicy.java        |   4 +-
 .../server/datanode/TestFsDatasetCache.java     |   4 +-
 .../TestStoragePolicySatisfyWorker.java         |  52 ++-
 .../hdfs/server/datanode/TestStorageReport.java |   4 +-
 .../server/namenode/NNThroughputBenchmark.java  |   6 +-
 .../hdfs/server/namenode/NameNodeAdapter.java   |   4 +-
 .../TestBlockStorageMovementAttemptedItems.java | 145 ++++----
 .../hdfs/server/namenode/TestDeadDatanode.java  |   4 +-
 .../namenode/TestStoragePolicySatisfier.java    | 115 ++++++-
 ...stStoragePolicySatisfierWithStripedFile.java |  20 +-
 37 files changed, 931 insertions(+), 1158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index e74c90c..0686e83 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -27,7 +27,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFau
 import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
 import org.apache.hadoop.http.HttpConfig;
 
-/** 
+/**
  * This class contains constants for configuration keys and default values
  * used in hdfs.
  */
@@ -306,7 +306,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
 
   public static final String  
DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC = 
"dfs.namenode.lazypersist.file.scrub.interval.sec";
   public static final int     
DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT = 5 * 60;
-  
+
   public static final String  DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = 
"dfs.namenode.edits.noeditlogchannelflush";
   public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT 
= false;
 
@@ -366,11 +366,11 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
   // The default value of the time interval for marking datanodes as stale
   public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = 
"dfs.namenode.stale.datanode.interval";
   public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 
1000; // 30s
-  // The stale interval cannot be too small since otherwise this may cause too 
frequent churn on stale states. 
-  // This value uses the times of heartbeat interval to define the minimum 
value for stale interval.  
+  // The stale interval cannot be too small since otherwise this may cause too 
frequent churn on stale states.
+  // This value uses the times of heartbeat interval to define the minimum 
value for stale interval.
   public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY 
= "dfs.namenode.stale.datanode.minimum.interval";
   public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT 
= 3; // i.e. min_interval is 3 * heartbeat_interval = 9s
-  
+
   // When the percentage of stale datanodes reaches this ratio,
   // allow writing to stale nodes to prevent hotspots.
   public static final String 
DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = 
"dfs.namenode.write.stale.datanode.ratio";
@@ -556,11 +556,15 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
   public static final String 
DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY =
       "dfs.storage.policy.satisfier.recheck.timeout.millis";
   public static final int 
DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT =
-      5 * 60 * 1000;
+      1 * 60 * 1000;
   public static final String 
DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY =
       "dfs.storage.policy.satisfier.self.retry.timeout.millis";
   public static final int 
DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT =
-      20 * 60 * 1000;
+      5 * 60 * 1000;
+  public static final String 
DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY =
+      "dfs.storage.policy.satisfier.low.max-streams.preference";
+  public static final boolean 
DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_DEFAULT =
+      false;
 
   public static final String  DFS_DATANODE_ADDRESS_KEY = 
"dfs.datanode.address";
   public static final int     DFS_DATANODE_DEFAULT_PORT = 9866;
@@ -930,7 +934,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_JOURNALNODE_RPC_ADDRESS_KEY = 
"dfs.journalnode.rpc-address";
   public static final int     DFS_JOURNALNODE_RPC_PORT_DEFAULT = 8485;
   public static final String  DFS_JOURNALNODE_RPC_ADDRESS_DEFAULT = "0.0.0.0:" 
+ DFS_JOURNALNODE_RPC_PORT_DEFAULT;
-    
+
   public static final String  DFS_JOURNALNODE_HTTP_ADDRESS_KEY = 
"dfs.journalnode.http-address";
   public static final int     DFS_JOURNALNODE_HTTP_PORT_DEFAULT = 8480;
   public static final String  DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT = 
"0.0.0.0:" + DFS_JOURNALNODE_HTTP_PORT_DEFAULT;
@@ -951,7 +955,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   // Journal-node related configs for the client side.
   public static final String  DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = 
"dfs.qjournal.queued-edits.limit.mb";
   public static final int     DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT = 10;
-  
+
   // Quorum-journal timeouts for various operations. Unlikely to need
   // to be tweaked, but configurable just in case.
   public static final String  DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY = 
"dfs.qjournal.start-segment.timeout.ms";
@@ -970,17 +974,17 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
   public static final int     DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 
120000;
   public static final int     DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000;
   public static final int     DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000;
-  
+
   public static final String DFS_MAX_NUM_BLOCKS_TO_LOG_KEY = 
"dfs.namenode.max-num-blocks-to-log";
   public static final long   DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT = 1000l;
-  
+
   public static final String DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY = 
"dfs.namenode.enable.retrycache";
   public static final boolean DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT = true;
   public static final String DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY = 
"dfs.namenode.retrycache.expirytime.millis";
   public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT 
= 600000; // 10 minutes
   public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = 
"dfs.namenode.retrycache.heap.percent";
   public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 
0.03f;
-  
+
   // Hidden configuration undocumented in hdfs-site. xml
   // Timeout to wait for block receiver and responder thread to stop
   public static final String DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY = 
"dfs.datanode.xceiver.stop.timeout.millis";
@@ -1025,7 +1029,7 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
       = HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_MAX_DEFAULT;
 
   // Handling unresolved DN topology mapping
-  public static final String  DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY = 
+  public static final String  DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY =
       "dfs.namenode.reject-unresolved-dn-topology-mapping";
   public static final boolean 
DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT =
       false;
@@ -1125,13 +1129,13 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
       "dfs.use.dfs.network.topology";
   public static final boolean DFS_USE_DFS_NETWORK_TOPOLOGY_DEFAULT = true;
 
-  // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry 
+  // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
   @Deprecated
   public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
       = HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY;
   @Deprecated
   public static final boolean DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT
-      = HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT; 
+      = HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT;
   @Deprecated
   public static final String  DFS_CLIENT_RETRY_POLICY_SPEC_KEY
       = HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY;
@@ -1163,7 +1167,7 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
   public static final int     DFS_CLIENT_RETRY_WINDOW_BASE_DEFAULT
       = HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT;
 
-  // dfs.client.failover confs are moved to HdfsClientConfigKeys.Failover 
+  // dfs.client.failover confs are moved to HdfsClientConfigKeys.Failover
   @Deprecated
   public static final String  DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX
       = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX;
@@ -1197,8 +1201,8 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
   @Deprecated
   public static final int     
DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT
       = 
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT;
-  
-  // dfs.client.write confs are moved to HdfsClientConfigKeys.Write 
+
+  // dfs.client.write confs are moved to HdfsClientConfigKeys.Write
   @Deprecated
   public static final String  DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY
       = HdfsClientConfigKeys.Write.MAX_PACKETS_IN_FLIGHT_KEY;
@@ -1236,7 +1240,7 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
   public static final long    
DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT
       = 
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT;
 
-  // dfs.client.block.write confs are moved to HdfsClientConfigKeys.BlockWrite 
+  // dfs.client.block.write confs are moved to HdfsClientConfigKeys.BlockWrite
   @Deprecated
   public static final String  DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY
       = HdfsClientConfigKeys.BlockWrite.RETRIES_KEY;
@@ -1274,13 +1278,13 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
   public static final boolean 
DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT
       = 
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_DEFAULT;
 
-  // dfs.client.read confs are moved to HdfsClientConfigKeys.Read 
+  // dfs.client.read confs are moved to HdfsClientConfigKeys.Read
   @Deprecated
   public static final String  DFS_CLIENT_READ_PREFETCH_SIZE_KEY
-      = HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY; 
+      = HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY;
   @Deprecated
   public static final String  DFS_CLIENT_READ_SHORTCIRCUIT_KEY
-      = HdfsClientConfigKeys.Read.ShortCircuit.KEY; 
+      = HdfsClientConfigKeys.Read.ShortCircuit.KEY;
   @Deprecated
   public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT
       = HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT;
@@ -1309,7 +1313,7 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
   public static final long    
DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT
       = HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_DEFAULT;
 
-  // dfs.client.mmap confs are moved to HdfsClientConfigKeys.Mmap 
+  // dfs.client.mmap confs are moved to HdfsClientConfigKeys.Mmap
   @Deprecated
   public static final String  DFS_CLIENT_MMAP_ENABLED
       = HdfsClientConfigKeys.Mmap.ENABLED_KEY;
@@ -1335,7 +1339,7 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
   public static final long    DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT
       = HdfsClientConfigKeys.Mmap.RETRY_TIMEOUT_MS_DEFAULT;
 
-  // dfs.client.short.circuit confs are moved to 
HdfsClientConfigKeys.ShortCircuit 
+  // dfs.client.short.circuit confs are moved to 
HdfsClientConfigKeys.ShortCircuit
   @Deprecated
   public static final String  
DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS
       = HdfsClientConfigKeys.ShortCircuit.REPLICA_STALE_THRESHOLD_MS_KEY;
@@ -1343,7 +1347,7 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
   public static final long    
DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT
       = HdfsClientConfigKeys.ShortCircuit.REPLICA_STALE_THRESHOLD_MS_DEFAULT;
 
-  // dfs.client.hedged.read confs are moved to HdfsClientConfigKeys.HedgedRead 
+  // dfs.client.hedged.read confs are moved to HdfsClientConfigKeys.HedgedRead
   @Deprecated
   public static final String  DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS
       = HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 9dd87d0..dcc0705 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -48,7 +48,7 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlock
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -140,7 +140,8 @@ public class DatanodeProtocolClientSideTranslatorPB 
implements
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
       @Nonnull SlowDiskReports slowDisks,
-      BlocksStorageMovementResult[] blksMovementResults) throws IOException {
+      BlocksStorageMoveAttemptFinished storageMovementFinishedBlks)
+          throws IOException {
     HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
         .setRegistration(PBHelper.convert(registration))
         .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@@ -165,8 +166,11 @@ public class DatanodeProtocolClientSideTranslatorPB 
implements
     }
 
     // Adding blocks movement results to the heart beat request.
-    builder.addAllBlksMovementResults(
-        PBHelper.convertBlksMovResults(blksMovementResults));
+    if (storageMovementFinishedBlks != null
+        && storageMovementFinishedBlks.getBlocks() != null) {
+      builder.setStorageMoveAttemptFinishedBlks(
+          PBHelper.convertBlksMovReport(storageMovementFinishedBlks));
+    }
 
     HeartbeatResponseProto resp;
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index 40458ef..b5bb80a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -123,8 +123,8 @@ public class DatanodeProtocolServerSideTranslatorPB 
implements
           volumeFailureSummary, request.getRequestFullBlockReportLease(),
           PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
           PBHelper.convertSlowDiskInfo(request.getSlowDisksList()),
-          PBHelper.convertBlksMovResults(
-              request.getBlksMovementResultsList()));
+          PBHelper.convertBlksMovReport(
+              request.getStorageMoveAttemptFinishedBlks()));
     } catch (IOException e) {
       throw new ServiceException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 5044c0b..d329f9e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -40,6 +40,7 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBand
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockMovingInfoProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
@@ -54,11 +55,11 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerRepo
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMovementResultProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMoveAttemptFinishedProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto;
@@ -100,8 +101,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStr
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
-import 
org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import 
org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import 
org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@@ -967,59 +967,27 @@ public class PBHelper {
     return SlowDiskReports.create(slowDisksMap);
   }
 
-  public static BlocksStorageMovementResult[] convertBlksMovResults(
-      List<BlocksStorageMovementResultProto> protos) {
-    BlocksStorageMovementResult[] results =
-        new BlocksStorageMovementResult[protos.size()];
-    for (int i = 0; i < protos.size(); i++) {
-      BlocksStorageMovementResultProto resultProto = protos.get(i);
-      BlocksStorageMovementResult.Status status;
-      switch (resultProto.getStatus()) {
-      case SUCCESS:
-        status = Status.SUCCESS;
-        break;
-      case FAILURE:
-        status = Status.FAILURE;
-        break;
-      case IN_PROGRESS:
-        status = Status.IN_PROGRESS;
-        break;
-      default:
-        throw new AssertionError("Unknown status: " + resultProto.getStatus());
-      }
-      results[i] = new BlocksStorageMovementResult(resultProto.getTrackID(),
-          status);
+  public static BlocksStorageMoveAttemptFinished convertBlksMovReport(
+      BlocksStorageMoveAttemptFinishedProto proto) {
+
+    List<BlockProto> blocksList = proto.getBlocksList();
+    Block[] blocks = new Block[blocksList.size()];
+    for (int i = 0; i < blocksList.size(); i++) {
+      BlockProto blkProto = blocksList.get(i);
+      blocks[i] = PBHelperClient.convert(blkProto);
     }
-    return results;
+    return new BlocksStorageMoveAttemptFinished(blocks);
   }
 
-  public static List<BlocksStorageMovementResultProto> convertBlksMovResults(
-      BlocksStorageMovementResult[] blocksMovementResults) {
-    List<BlocksStorageMovementResultProto> blocksMovementResultsProto =
-        new ArrayList<>();
-    BlocksStorageMovementResultProto.Builder builder =
-        BlocksStorageMovementResultProto.newBuilder();
-    for (int i = 0; i < blocksMovementResults.length; i++) {
-      BlocksStorageMovementResult report = blocksMovementResults[i];
-      builder.setTrackID(report.getTrackId());
-      BlocksStorageMovementResultProto.Status status;
-      switch (report.getStatus()) {
-      case SUCCESS:
-        status = BlocksStorageMovementResultProto.Status.SUCCESS;
-        break;
-      case FAILURE:
-        status = BlocksStorageMovementResultProto.Status.FAILURE;
-        break;
-      case IN_PROGRESS:
-        status = BlocksStorageMovementResultProto.Status.IN_PROGRESS;
-        break;
-      default:
-        throw new AssertionError("Unknown status: " + report.getStatus());
-      }
-      builder.setStatus(status);
-      blocksMovementResultsProto.add(builder.build());
+  public static BlocksStorageMoveAttemptFinishedProto convertBlksMovReport(
+      BlocksStorageMoveAttemptFinished blocksMoveAttemptFinished) {
+    BlocksStorageMoveAttemptFinishedProto.Builder builder =
+        BlocksStorageMoveAttemptFinishedProto.newBuilder();
+    Block[] blocks = blocksMoveAttemptFinished.getBlocks();
+    for (Block block : blocks) {
+      builder.addBlocks(PBHelperClient.convert(block));
     }
-    return blocksMovementResultsProto;
+    return builder.build();
   }
 
   public static JournalInfo convert(JournalInfoProto info) {
@@ -1183,34 +1151,34 @@ public class PBHelper {
     BlockStorageMovementCommandProto.Builder builder =
         BlockStorageMovementCommandProto.newBuilder();
 
-    builder.setTrackID(blkStorageMovementCmd.getTrackID());
     builder.setBlockPoolId(blkStorageMovementCmd.getBlockPoolId());
     Collection<BlockMovingInfo> blockMovingInfos = blkStorageMovementCmd
         .getBlockMovingTasks();
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
-      builder.addBlockStorageMovement(
-          convertBlockMovingInfo(blkMovingInfo));
+      builder.addBlockMovingInfo(convertBlockMovingInfo(blkMovingInfo));
     }
     return builder.build();
   }
 
-  private static BlockStorageMovementProto convertBlockMovingInfo(
+  private static BlockMovingInfoProto convertBlockMovingInfo(
       BlockMovingInfo blkMovingInfo) {
-    BlockStorageMovementProto.Builder builder = BlockStorageMovementProto
+    BlockMovingInfoProto.Builder builder = BlockMovingInfoProto
         .newBuilder();
     builder.setBlock(PBHelperClient.convert(blkMovingInfo.getBlock()));
 
-    DatanodeInfo[] sourceDnInfos = blkMovingInfo.getSources();
-    builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos));
+    DatanodeInfo sourceDnInfo = blkMovingInfo.getSource();
+    builder.setSourceDnInfo(PBHelperClient.convert(sourceDnInfo));
 
-    DatanodeInfo[] targetDnInfos = blkMovingInfo.getTargets();
-    builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos));
+    DatanodeInfo targetDnInfo = blkMovingInfo.getTarget();
+    builder.setTargetDnInfo(PBHelperClient.convert(targetDnInfo));
 
-    StorageType[] sourceStorageTypes = blkMovingInfo.getSourceStorageTypes();
-    
builder.setSourceStorageTypes(convertStorageTypesProto(sourceStorageTypes));
+    StorageType sourceStorageType = blkMovingInfo.getSourceStorageType();
+    builder.setSourceStorageType(
+        PBHelperClient.convertStorageType(sourceStorageType));
 
-    StorageType[] targetStorageTypes = blkMovingInfo.getTargetStorageTypes();
-    
builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
+    StorageType targetStorageType = blkMovingInfo.getTargetStorageType();
+    builder.setTargetStorageType(
+        PBHelperClient.convertStorageType(targetStorageType));
 
     return builder.build();
   }
@@ -1218,42 +1186,38 @@ public class PBHelper {
   private static DatanodeCommand convert(
       BlockStorageMovementCommandProto blkStorageMovementCmdProto) {
     Collection<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
-    List<BlockStorageMovementProto> blkSPSatisfyList =
-        blkStorageMovementCmdProto.getBlockStorageMovementList();
-    for (BlockStorageMovementProto blkSPSatisfy : blkSPSatisfyList) {
+    List<BlockMovingInfoProto> blkSPSatisfyList =
+        blkStorageMovementCmdProto.getBlockMovingInfoList();
+    for (BlockMovingInfoProto blkSPSatisfy : blkSPSatisfyList) {
       blockMovingInfos.add(convertBlockMovingInfo(blkSPSatisfy));
     }
     return new BlockStorageMovementCommand(
         DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT,
-        blkStorageMovementCmdProto.getTrackID(),
         blkStorageMovementCmdProto.getBlockPoolId(), blockMovingInfos);
   }
 
   private static BlockMovingInfo convertBlockMovingInfo(
-      BlockStorageMovementProto blockStoragePolicySatisfyProto) {
-    BlockProto blockProto = blockStoragePolicySatisfyProto.getBlock();
+      BlockMovingInfoProto blockStorageMovingInfoProto) {
+    BlockProto blockProto = blockStorageMovingInfoProto.getBlock();
     Block block = PBHelperClient.convert(blockProto);
 
-    DatanodeInfosProto sourceDnInfosProto = blockStoragePolicySatisfyProto
-        .getSourceDnInfos();
-    DatanodeInfo[] sourceDnInfos = PBHelperClient.convert(sourceDnInfosProto);
-
-    DatanodeInfosProto targetDnInfosProto = blockStoragePolicySatisfyProto
-        .getTargetDnInfos();
-    DatanodeInfo[] targetDnInfos = PBHelperClient.convert(targetDnInfosProto);
-
-    StorageTypesProto srcStorageTypesProto = blockStoragePolicySatisfyProto
-        .getSourceStorageTypes();
-    StorageType[] srcStorageTypes = PBHelperClient.convertStorageTypes(
-        srcStorageTypesProto.getStorageTypesList(),
-        srcStorageTypesProto.getStorageTypesList().size());
-
-    StorageTypesProto targetStorageTypesProto = blockStoragePolicySatisfyProto
-        .getTargetStorageTypes();
-    StorageType[] targetStorageTypes = PBHelperClient.convertStorageTypes(
-        targetStorageTypesProto.getStorageTypesList(),
-        targetStorageTypesProto.getStorageTypesList().size());
-    return new BlockMovingInfo(block, sourceDnInfos, targetDnInfos,
-        srcStorageTypes, targetStorageTypes);
+    DatanodeInfoProto sourceDnInfoProto = blockStorageMovingInfoProto
+        .getSourceDnInfo();
+    DatanodeInfo sourceDnInfo = PBHelperClient.convert(sourceDnInfoProto);
+
+    DatanodeInfoProto targetDnInfoProto = blockStorageMovingInfoProto
+        .getTargetDnInfo();
+    DatanodeInfo targetDnInfo = PBHelperClient.convert(targetDnInfoProto);
+    StorageTypeProto srcStorageTypeProto = blockStorageMovingInfoProto
+        .getSourceStorageType();
+    StorageType srcStorageType = PBHelperClient
+        .convertStorageType(srcStorageTypeProto);
+
+    StorageTypeProto targetStorageTypeProto = blockStorageMovingInfoProto
+        .getTargetStorageType();
+    StorageType targetStorageType = PBHelperClient
+        .convertStorageType(targetStorageTypeProto);
+    return new BlockMovingInfo(block, sourceDnInfo, targetDnInfo,
+        srcStorageType, targetStorageType);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 95cb3a5..9210e59 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementInfosBatch;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
@@ -211,7 +210,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
    * A queue of blocks corresponding to trackID for moving its storage
    * placements by this datanode.
    */
-  private final Queue<BlockStorageMovementInfosBatch> storageMovementBlocks =
+  private final Queue<BlockMovingInfo> storageMovementBlocks =
       new LinkedList<>();
   private volatile boolean dropSPSWork = false;
 
@@ -1029,30 +1028,45 @@ public class DatanodeDescriptor extends DatanodeInfo {
   /**
    * Add the block infos which needs to move its storage locations.
    *
-   * @param trackID
-   *          - unique identifier which will be used for tracking the given set
-   *          of blocks movement completion.
-   * @param storageMismatchedBlocks
-   *          - storage mismatched block infos
+   * @param blkMovingInfo
+   *          - storage mismatched block info
    */
-  public void addBlocksToMoveStorage(long trackID,
-      List<BlockMovingInfo> storageMismatchedBlocks) {
+  public void addBlocksToMoveStorage(BlockMovingInfo blkMovingInfo) {
     synchronized (storageMovementBlocks) {
-      storageMovementBlocks.offer(
-          new BlockStorageMovementInfosBatch(trackID, 
storageMismatchedBlocks));
+      storageMovementBlocks.offer(blkMovingInfo);
     }
   }
 
   /**
-   * @return block infos which needs to move its storage locations. This 
returns
-   *         list of blocks under one trackId.
+   * Return the number of blocks queued up for movement.
    */
-  public BlockStorageMovementInfosBatch getBlocksToMoveStorages() {
+  public int getNumberOfBlocksToMoveStorages() {
+    return storageMovementBlocks.size();
+  }
+
+  /**
+   * Get the blocks to move to satisfy the storage media type.
+   *
+   * @param numBlocksToMoveTasks
+   *          total number of blocks which will be send to this datanode for
+   *          block movement.
+   *
+   * @return block infos which needs to move its storage locations.
+   */
+  public BlockMovingInfo[] getBlocksToMoveStorages(int numBlocksToMoveTasks) {
     synchronized (storageMovementBlocks) {
-      // TODO: Presently returning the list of blocks under one trackId.
-      // Need to limit the list of items into small batches with in trackId
-      // itself if blocks are many(For example: a file contains many blocks).
-      return storageMovementBlocks.poll();
+      List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
+      for (; !storageMovementBlocks.isEmpty()
+          && numBlocksToMoveTasks > 0; numBlocksToMoveTasks--) {
+        blockMovingInfos.add(storageMovementBlocks.poll());
+      }
+      BlockMovingInfo[] blkMoveArray = new BlockMovingInfo[blockMovingInfos
+          .size()];
+      blkMoveArray = blockMovingInfos.toArray(blkMoveArray);
+      if (blkMoveArray.length > 0) {
+        return blkMoveArray;
+      }
+      return null;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 3504cb0..4ea41d9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -40,7 +40,6 @@ import 
org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import 
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
 import 
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
 import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementInfosBatch;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -48,6 +47,7 @@ import org.apache.hadoop.hdfs.server.protocol.*;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
+import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.*;
 import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
@@ -208,6 +208,8 @@ public class DatanodeManager {
    */
   private final long timeBetweenResendingCachingDirectivesMs;
 
+  private final boolean blocksToMoveShareEqualRatio;
+
   DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
       final Configuration conf) throws IOException {
     this.namesystem = namesystem;
@@ -332,6 +334,12 @@ public class DatanodeManager {
     this.blocksPerPostponedMisreplicatedBlocksRescan = conf.getLong(
         DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY,
         
DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT);
+
+    // SPS configuration to decide blocks to move can share equal ratio of
+    // maxtransfers with pending replica and erasure-coded reconstruction tasks
+    blocksToMoveShareEqualRatio = conf.getBoolean(
+        
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY,
+        
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_DEFAULT);
   }
 
   private static long getStaleIntervalFromConf(Configuration conf,
@@ -1092,13 +1100,14 @@ public class DatanodeManager {
           // Sets dropSPSWork flag to true, to ensure that
           // DNA_DROP_SPS_WORK_COMMAND will send to datanode via next heartbeat
           // response immediately after the node registration. This is
-          // to avoid a situation, where multiple trackId responses coming from
-          // different co-odinator datanodes. After SPS monitor time out, it
-          // will retry the files which were scheduled to the disconnected(for
-          // long time more than heartbeat expiry) DN, by finding new
-          // co-ordinator datanode. Now, if the expired datanode reconnects 
back
-          // after SPS reschedules, it leads to get different movement results
-          // from reconnected and new DN co-ordinators.
+          // to avoid a situation, where multiple block attempt finished
+          // responses coming from different datanodes. After SPS monitor time
+          // out, it will retry the files which were scheduled to the
+          // disconnected(for long time more than heartbeat expiry) DN, by
+          // finding new datanode. Now, if the expired datanode reconnects back
+          // after SPS reschedules, it leads to get different movement attempt
+          // finished report from reconnected and newly datanode which is
+          // attempting the block movement.
           nodeS.setDropSPSWork(true);
 
           // resolve network location
@@ -1678,19 +1687,47 @@ public class DatanodeManager {
     final List<DatanodeCommand> cmds = new ArrayList<>();
     // Allocate _approximately_ maxTransfers pending tasks to DataNode.
     // NN chooses pending tasks based on the ratio between the lengths of
-    // replication and erasure-coded block queues.
+    // replication, erasure-coded block queues and block storage movement
+    // queues.
     int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks();
     int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded();
+    int totalBlocksToMove = nodeinfo.getNumberOfBlocksToMoveStorages();
     int totalBlocks = totalReplicateBlocks + totalECBlocks;
-    if (totalBlocks > 0) {
-      int numReplicationTasks = (int) Math.ceil(
-          (double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
-      int numECTasks = (int) Math.ceil(
-          (double) (totalECBlocks * maxTransfers) / totalBlocks);
-
+    if (totalBlocks > 0 || totalBlocksToMove > 0) {
+      int numReplicationTasks = 0;
+      int numECTasks = 0;
+      int numBlocksToMoveTasks = 0;
+      // Check blocksToMoveShareEqualRatio configuration is true/false. If 
true,
+      // then equally sharing the max transfer. Otherwise gives high priority 
to
+      // the pending_replica/erasure-coded tasks and only the delta streams 
will
+      // be used for blocks to move tasks.
+      if (blocksToMoveShareEqualRatio) {
+        // add blocksToMove count to total blocks so that will get equal share
+        totalBlocks = totalBlocks + totalBlocksToMove;
+        numReplicationTasks = (int) Math
+            .ceil((double) (totalReplicateBlocks * maxTransfers) / 
totalBlocks);
+        numECTasks = (int) Math
+            .ceil((double) (totalECBlocks * maxTransfers) / totalBlocks);
+        numBlocksToMoveTasks = (int) Math
+            .ceil((double) (totalBlocksToMove * maxTransfers) / totalBlocks);
+      } else {
+        // Calculate the replica and ec tasks, then pick blocksToMove if there
+        // is any streams available.
+        numReplicationTasks = (int) Math
+            .ceil((double) (totalReplicateBlocks * maxTransfers) / 
totalBlocks);
+        numECTasks = (int) Math
+            .ceil((double) (totalECBlocks * maxTransfers) / totalBlocks);
+        int numTasks = numReplicationTasks + numECTasks;
+        if (numTasks < maxTransfers) {
+          int remainingMaxTransfers = maxTransfers - numTasks;
+          numBlocksToMoveTasks = Math.min(totalBlocksToMove,
+              remainingMaxTransfers);
+        }
+      }
       if (LOG.isDebugEnabled()) {
         LOG.debug("Pending replication tasks: " + numReplicationTasks
-            + " erasure-coded tasks: " + numECTasks);
+            + " erasure-coded tasks: " + numECTasks + " blocks to move tasks: "
+            + numBlocksToMoveTasks);
       }
       // check pending replication tasks
       List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
@@ -1706,6 +1743,23 @@ public class DatanodeManager {
         cmds.add(new BlockECReconstructionCommand(
             DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));
       }
+      // check pending block storage movement tasks
+      if (nodeinfo.shouldDropSPSWork()) {
+        cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
+        // Set back to false to indicate that the new value has been sent to 
the
+        // datanode.
+        nodeinfo.setDropSPSWork(false);
+      } else {
+        // Get pending block storage movement tasks
+        BlockMovingInfo[] blkStorageMovementInfos = nodeinfo
+            .getBlocksToMoveStorages(numBlocksToMoveTasks);
+
+        if (blkStorageMovementInfos != null) {
+          cmds.add(new BlockStorageMovementCommand(
+              DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, blockPoolId,
+              Arrays.asList(blkStorageMovementInfos)));
+        }
+      }
     }
 
     // check block invalidation
@@ -1749,24 +1803,6 @@ public class DatanodeManager {
       }
     }
 
-    if (nodeinfo.shouldDropSPSWork()) {
-      cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
-      // Set back to false to indicate that the new value has been sent to the
-      // datanode.
-      nodeinfo.setDropSPSWork(false);
-    }
-
-    // check pending block storage movement tasks
-    BlockStorageMovementInfosBatch blkStorageMovementInfosBatch = nodeinfo
-        .getBlocksToMoveStorages();
-
-    if (blkStorageMovementInfosBatch != null) {
-      cmds.add(new BlockStorageMovementCommand(
-          DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT,
-          blkStorageMovementInfosBatch.getTrackID(), blockPoolId,
-          blkStorageMovementInfosBatch.getBlockMovingInfo()));
-    }
-
     if (!cmds.isEmpty()) {
       return cmds.toArray(new DatanodeCommand[cmds.size()]);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index d60fb6d..65ab246 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -785,8 +785,7 @@ class BPOfferService {
       LOG.info("DatanodeCommand action: DNA_BLOCK_STORAGE_MOVEMENT");
       BlockStorageMovementCommand blkSPSCmd = (BlockStorageMovementCommand) 
cmd;
       dn.getStoragePolicySatisfyWorker().processBlockMovingTasks(
-          blkSPSCmd.getTrackID(), blkSPSCmd.getBlockPoolId(),
-          blkSPSCmd.getBlockMovingTasks());
+          blkSPSCmd.getBlockPoolId(), blkSPSCmd.getBlockMovingTasks());
       break;
     case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
       LOG.info("DatanodeCommand action: DNA_DROP_SPS_WORK_COMMAND");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index f537f49..b7beda4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.client.BlockReportOptions;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -50,7 +51,7 @@ import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -513,8 +514,11 @@ class BPServiceActor implements Runnable {
             SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) 
:
             SlowDiskReports.EMPTY_REPORT;
 
-    BlocksStorageMovementResult[] blksMovementResults =
-        getBlocksMovementResults();
+    // Get the blocks storage move attempt finished blocks
+    List<Block> results = dn.getStoragePolicySatisfyWorker()
+        .getBlocksMovementsStatusHandler().getMoveAttemptFinishedBlocks();
+    BlocksStorageMoveAttemptFinished storageMoveAttemptFinishedBlks =
+        getStorageMoveAttemptFinishedBlocks(results);
 
     HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
         reports,
@@ -527,7 +531,7 @@ class BPServiceActor implements Runnable {
         requestBlockReportLease,
         slowPeers,
         slowDisks,
-        blksMovementResults);
+        storageMoveAttemptFinishedBlks);
 
     if (outliersReportDue) {
       // If the report was due and successfully sent, schedule the next one.
@@ -537,20 +541,23 @@ class BPServiceActor implements Runnable {
     // Remove the blocks movement results after successfully transferring
     // to namenode.
     dn.getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler()
-        .remove(blksMovementResults);
+        .remove(results);
 
     return response;
   }
 
-  private BlocksStorageMovementResult[] getBlocksMovementResults() {
-    List<BlocksStorageMovementResult> trackIdVsMovementStatus = dn
-        .getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler()
-        .getBlksMovementResults();
-    BlocksStorageMovementResult[] blksMovementResult =
-        new BlocksStorageMovementResult[trackIdVsMovementStatus.size()];
-    trackIdVsMovementStatus.toArray(blksMovementResult);
+  private BlocksStorageMoveAttemptFinished getStorageMoveAttemptFinishedBlocks(
+      List<Block> finishedBlks) {
 
-    return blksMovementResult;
+    if (finishedBlks.isEmpty()) {
+      return null;
+    }
+
+    // Create BlocksStorageMoveAttemptFinished with currently finished
+    // blocks
+    Block[] blockList = new Block[finishedBlks.size()];
+    finishedBlks.toArray(blockList);
+    return new BlocksStorageMoveAttemptFinished(blockList);
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
index f3d2bb6..b3b9fd9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
@@ -21,14 +21,14 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import 
org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
+import org.apache.hadoop.hdfs.protocol.Block;
+import 
org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementAttemptFinished;
 import 
org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsStatusHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,12 +41,12 @@ import org.slf4j.LoggerFactory;
 public class BlockStorageMovementTracker implements Runnable {
   private static final Logger LOG = LoggerFactory
       .getLogger(BlockStorageMovementTracker.class);
-  private final CompletionService<BlockMovementResult> moverCompletionService;
+  private final CompletionService<BlockMovementAttemptFinished> 
moverCompletionService;
   private final BlocksMovementsStatusHandler blksMovementsStatusHandler;
 
-  // Keeps the information - trackID vs its list of blocks
-  private final Map<Long, List<Future<BlockMovementResult>>> moverTaskFutures;
-  private final Map<Long, List<BlockMovementResult>> movementResults;
+  // Keeps the information - block vs its list of future move tasks
+  private final Map<Block, List<Future<BlockMovementAttemptFinished>>> 
moverTaskFutures;
+  private final Map<Block, List<BlockMovementAttemptFinished>> movementResults;
 
   private volatile boolean running = true;
 
@@ -59,7 +59,7 @@ public class BlockStorageMovementTracker implements Runnable {
    *          blocks movements status handler
    */
   public BlockStorageMovementTracker(
-      CompletionService<BlockMovementResult> moverCompletionService,
+      CompletionService<BlockMovementAttemptFinished> moverCompletionService,
       BlocksMovementsStatusHandler handler) {
     this.moverCompletionService = moverCompletionService;
     this.moverTaskFutures = new HashMap<>();
@@ -82,32 +82,33 @@ public class BlockStorageMovementTracker implements 
Runnable {
         }
       }
       try {
-        Future<BlockMovementResult> future = moverCompletionService.take();
+        Future<BlockMovementAttemptFinished> future =
+            moverCompletionService.take();
         if (future != null) {
-          BlockMovementResult result = future.get();
+          BlockMovementAttemptFinished result = future.get();
           LOG.debug("Completed block movement. {}", result);
-          long trackId = result.getTrackId();
-          List<Future<BlockMovementResult>> blocksMoving = moverTaskFutures
-              .get(trackId);
+          Block block = result.getBlock();
+          List<Future<BlockMovementAttemptFinished>> blocksMoving =
+              moverTaskFutures.get(block);
           if (blocksMoving == null) {
-            LOG.warn("Future task doesn't exist for trackId " + trackId);
+            LOG.warn("Future task doesn't exist for block : {} ", block);
             continue;
           }
           blocksMoving.remove(future);
 
-          List<BlockMovementResult> resultPerTrackIdList =
-              addMovementResultToTrackIdList(result);
+          List<BlockMovementAttemptFinished> resultPerTrackIdList =
+              addMovementResultToBlockIdList(result);
 
           // Completed all the scheduled blocks movement under this 'trackId'.
-          if (blocksMoving.isEmpty() || moverTaskFutures.get(trackId) == null) 
{
+          if (blocksMoving.isEmpty() || moverTaskFutures.get(block) == null) {
             synchronized (moverTaskFutures) {
-              moverTaskFutures.remove(trackId);
+              moverTaskFutures.remove(block);
             }
             if (running) {
               // handle completed or inprogress blocks movements per trackId.
               blksMovementsStatusHandler.handle(resultPerTrackIdList);
             }
-            movementResults.remove(trackId);
+            movementResults.remove(block);
           }
         }
       } catch (InterruptedException e) {
@@ -123,38 +124,39 @@ public class BlockStorageMovementTracker implements 
Runnable {
     }
   }
 
-  private List<BlockMovementResult> addMovementResultToTrackIdList(
-      BlockMovementResult result) {
-    long trackId = result.getTrackId();
-    List<BlockMovementResult> perTrackIdList;
+  private List<BlockMovementAttemptFinished> addMovementResultToBlockIdList(
+      BlockMovementAttemptFinished result) {
+    Block block = result.getBlock();
+    List<BlockMovementAttemptFinished> perBlockIdList;
     synchronized (movementResults) {
-      perTrackIdList = movementResults.get(trackId);
-      if (perTrackIdList == null) {
-        perTrackIdList = new ArrayList<>();
-        movementResults.put(trackId, perTrackIdList);
+      perBlockIdList = movementResults.get(block);
+      if (perBlockIdList == null) {
+        perBlockIdList = new ArrayList<>();
+        movementResults.put(block, perBlockIdList);
       }
-      perTrackIdList.add(result);
+      perBlockIdList.add(result);
     }
-    return perTrackIdList;
+    return perBlockIdList;
   }
 
   /**
    * Add future task to the tracking list to check the completion status of the
    * block movement.
    *
-   * @param trackID
-   *          tracking Id
+   * @param blockID
+   *          block identifier
    * @param futureTask
    *          future task used for moving the respective block
    */
-  void addBlock(long trackID, Future<BlockMovementResult> futureTask) {
+  void addBlock(Block block,
+      Future<BlockMovementAttemptFinished> futureTask) {
     synchronized (moverTaskFutures) {
-      List<Future<BlockMovementResult>> futures = moverTaskFutures
-          .get(Long.valueOf(trackID));
+      List<Future<BlockMovementAttemptFinished>> futures =
+          moverTaskFutures.get(block);
       // null for the first task
       if (futures == null) {
         futures = new ArrayList<>();
-        moverTaskFutures.put(trackID, futures);
+        moverTaskFutures.put(block, futures);
       }
       futures.add(futureTask);
       // Notify waiting tracker thread about the newly added tasks.
@@ -175,16 +177,6 @@ public class BlockStorageMovementTracker implements 
Runnable {
   }
 
   /**
-   * @return the list of trackIds which are still waiting to complete all the
-   *         scheduled blocks movements.
-   */
-  Set<Long> getInProgressTrackIds() {
-    synchronized (moverTaskFutures) {
-      return moverTaskFutures.keySet();
-    }
-  }
-
-  /**
    * Sets running flag to false and clear the pending movement result queues.
    */
   public void stopTracking() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 4e57805..47318f8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
-import static org.apache.hadoop.util.Time.monotonicNow;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -32,9 +31,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
@@ -62,7 +59,6 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
@@ -89,14 +85,11 @@ public class StoragePolicySatisfyWorker {
 
   private final int moverThreads;
   private final ExecutorService moveExecutor;
-  private final CompletionService<BlockMovementResult> moverCompletionService;
+  private final CompletionService<BlockMovementAttemptFinished> 
moverCompletionService;
   private final BlocksMovementsStatusHandler handler;
   private final BlockStorageMovementTracker movementTracker;
   private Daemon movementTrackerThread;
 
-  private long inprogressTrackIdsCheckInterval = 30 * 1000; // 30seconds.
-  private long nextInprogressRecheckTime;
-
   public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
     this.datanode = datanode;
     this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
@@ -111,16 +104,6 @@ public class StoragePolicySatisfyWorker {
     movementTrackerThread = new Daemon(movementTracker);
     movementTrackerThread.setName("BlockStorageMovementTracker");
 
-    // Interval to check that the inprogress trackIds. The time interval is
-    // proportional o the heart beat interval time period.
-    final long heartbeatIntervalSeconds = conf.getTimeDuration(
-        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
-        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
-    inprogressTrackIdsCheckInterval = 5 * heartbeatIntervalSeconds;
-    // update first inprogress recheck time to a future time stamp.
-    nextInprogressRecheckTime = monotonicNow()
-        + inprogressTrackIdsCheckInterval;
-
     // TODO: Needs to manage the number of concurrent moves per DataNode.
   }
 
@@ -186,30 +169,26 @@ public class StoragePolicySatisfyWorker {
    * separate thread. Each task will move the block replica to the target node 
&
    * wait for the completion.
    *
-   * @param trackID
-   *          unique tracking identifier
-   * @param blockPoolID
-   *          block pool ID
+   * @param blockPoolID block pool identifier
+   *
    * @param blockMovingInfos
    *          list of blocks to be moved
    */
-  public void processBlockMovingTasks(long trackID, String blockPoolID,
-      Collection<BlockMovingInfo> blockMovingInfos) {
+  public void processBlockMovingTasks(final String blockPoolID,
+      final Collection<BlockMovingInfo> blockMovingInfos) {
     LOG.debug("Received BlockMovingTasks {}", blockMovingInfos);
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
-      assert blkMovingInfo.getSources().length == blkMovingInfo
-          .getTargets().length;
-      for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
-        DatanodeInfo target = blkMovingInfo.getTargets()[i];
-        BlockMovingTask blockMovingTask = new BlockMovingTask(
-            trackID, blockPoolID, blkMovingInfo.getBlock(),
-            blkMovingInfo.getSources()[i], target,
-            blkMovingInfo.getSourceStorageTypes()[i],
-            blkMovingInfo.getTargetStorageTypes()[i]);
-        Future<BlockMovementResult> moveCallable = moverCompletionService
-            .submit(blockMovingTask);
-        movementTracker.addBlock(trackID, moveCallable);
-      }
+      StorageType sourceStorageType = blkMovingInfo.getSourceStorageType();
+      StorageType targetStorageType = blkMovingInfo.getTargetStorageType();
+      assert sourceStorageType != targetStorageType
+          : "Source and Target storage type shouldn't be same!";
+      BlockMovingTask blockMovingTask = new BlockMovingTask(blockPoolID,
+          blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
+          blkMovingInfo.getTarget(), sourceStorageType, targetStorageType);
+      Future<BlockMovementAttemptFinished> moveCallable = 
moverCompletionService
+          .submit(blockMovingTask);
+      movementTracker.addBlock(blkMovingInfo.getBlock(),
+          moveCallable);
     }
   }
 
@@ -217,8 +196,7 @@ public class StoragePolicySatisfyWorker {
    * This class encapsulates the process of moving the block replica to the
    * given target and wait for the response.
    */
-  private class BlockMovingTask implements Callable<BlockMovementResult> {
-    private final long trackID;
+  private class BlockMovingTask implements 
Callable<BlockMovementAttemptFinished> {
     private final String blockPoolID;
     private final Block block;
     private final DatanodeInfo source;
@@ -226,10 +204,9 @@ public class StoragePolicySatisfyWorker {
     private final StorageType srcStorageType;
     private final StorageType targetStorageType;
 
-    BlockMovingTask(long trackID, String blockPoolID, Block block,
+    BlockMovingTask(String blockPoolID, Block block,
         DatanodeInfo source, DatanodeInfo target,
         StorageType srcStorageType, StorageType targetStorageType) {
-      this.trackID = trackID;
       this.blockPoolID = blockPoolID;
       this.block = block;
       this.source = source;
@@ -239,23 +216,26 @@ public class StoragePolicySatisfyWorker {
     }
 
     @Override
-    public BlockMovementResult call() {
+    public BlockMovementAttemptFinished call() {
       BlockMovementStatus status = moveBlock();
-      return new BlockMovementResult(trackID, block.getBlockId(), target,
-          status);
+      return new BlockMovementAttemptFinished(block, source, target, status);
     }
 
     private BlockMovementStatus moveBlock() {
       LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
-              + "storageType, sourceStoragetype:{} and destinStoragetype:{}",
+          + "storageType, sourceStoragetype:{} and destinStoragetype:{}",
           block, source, target, srcStorageType, targetStorageType);
       Socket sock = null;
       DataOutputStream out = null;
       DataInputStream in = null;
       try {
+        datanode.incrementXmitsInProgress();
+
         ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
         DNConf dnConf = datanode.getDnConf();
-        String dnAddr = target.getXferAddr(dnConf.getConnectToDnViaHostname());
+
+        String dnAddr = datanode.getDatanodeId()
+            .getXferAddr(dnConf.getConnectToDnViaHostname());
         sock = datanode.newSocket();
         NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr),
             dnConf.getSocketTimeout());
@@ -297,9 +277,10 @@ public class StoragePolicySatisfyWorker {
         LOG.warn(
             "Failed to move block:{} from src:{} to destin:{} to satisfy "
                 + "storageType:{}",
-            block, source, target, targetStorageType, e);
+                block, source, target, targetStorageType, e);
         return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
       } finally {
+        datanode.decrementXmitsInProgress();
         IOUtils.closeStream(out);
         IOUtils.closeStream(in);
         IOUtils.closeSocket(sock);
@@ -357,29 +338,25 @@ public class StoragePolicySatisfyWorker {
   }
 
   /**
-   * This class represents result from a block movement task. This will have 
the
+   * This class represents status from a block movement task. This will have 
the
    * information of the task which was successful or failed due to errors.
    */
-  static class BlockMovementResult {
-    private final long trackId;
-    private final long blockId;
+  static class BlockMovementAttemptFinished {
+    private final Block block;
+    private final DatanodeInfo src;
     private final DatanodeInfo target;
     private final BlockMovementStatus status;
 
-    BlockMovementResult(long trackId, long blockId,
+    BlockMovementAttemptFinished(Block block, DatanodeInfo src,
         DatanodeInfo target, BlockMovementStatus status) {
-      this.trackId = trackId;
-      this.blockId = blockId;
+      this.block = block;
+      this.src = src;
       this.target = target;
       this.status = status;
     }
 
-    long getTrackId() {
-      return trackId;
-    }
-
-    long getBlockId() {
-      return blockId;
+    Block getBlock() {
+      return block;
     }
 
     BlockMovementStatus getStatus() {
@@ -388,99 +365,79 @@ public class StoragePolicySatisfyWorker {
 
     @Override
     public String toString() {
-      return new StringBuilder().append("Block movement result(\n  ")
-          .append("track id: ").append(trackId).append(" block id: ")
-          .append(blockId).append(" target node: ").append(target)
+      return new StringBuilder().append("Block movement attempt finished(\n  ")
+          .append(" block : ")
+          .append(block).append(" src node: ").append(src)
+          .append(" target node: ").append(target)
           .append(" movement status: ").append(status).append(")").toString();
     }
   }
 
   /**
    * Blocks movements status handler, which is used to collect details of the
-   * completed or inprogress list of block movements and this status(success or
-   * failure or inprogress) will be send to the namenode via heartbeat.
+   * completed block movements and it will send these attempted finished(with
+   * success or failure) blocks to the namenode via heartbeat.
    */
-  class BlocksMovementsStatusHandler {
-    private final List<BlocksStorageMovementResult> trackIdVsMovementStatus =
+  public static class BlocksMovementsStatusHandler {
+    private final List<Block> blockIdVsMovementStatus =
         new ArrayList<>();
 
     /**
-     * Collect all the block movement results. Later this will be send to
-     * namenode via heart beat.
+     * Collect all the storage movement attempt finished blocks. Later this 
will
+     * be send to namenode via heart beat.
      *
-     * @param results
-     *          result of all the block movements per trackId
+     * @param moveAttemptFinishedBlks
+     *          set of storage movement attempt finished blocks
      */
-    void handle(List<BlockMovementResult> resultsPerTrackId) {
-      BlocksStorageMovementResult.Status status =
-          BlocksStorageMovementResult.Status.SUCCESS;
-      long trackId = -1;
-      for (BlockMovementResult blockMovementResult : resultsPerTrackId) {
-        trackId = blockMovementResult.getTrackId();
-        if (blockMovementResult.status ==
-            BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE) {
-          status = BlocksStorageMovementResult.Status.FAILURE;
-          // If any of the block movement is failed, then mark as failure so
-          // that namenode can take a decision to retry the blocks associated 
to
-          // the given trackId.
-          break;
-        }
-      }
+    void handle(List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
+      List<Block> blocks = new ArrayList<>();
 
-      // Adding to the tracking results list. Later this will be send to
+      for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
+        blocks.add(item.getBlock());
+      }
+      // Adding to the tracking report list. Later this will be send to
       // namenode via datanode heartbeat.
-      synchronized (trackIdVsMovementStatus) {
-        trackIdVsMovementStatus.add(
-            new BlocksStorageMovementResult(trackId, status));
+      synchronized (blockIdVsMovementStatus) {
+        blockIdVsMovementStatus.addAll(blocks);
       }
     }
 
     /**
-     * @return unmodifiable list of blocks storage movement results.
+     * @return unmodifiable list of storage movement attempt finished blocks.
      */
-    List<BlocksStorageMovementResult> getBlksMovementResults() {
-      List<BlocksStorageMovementResult> movementResults = new ArrayList<>();
-      // 1. Adding all the completed trackids.
-      synchronized (trackIdVsMovementStatus) {
-        if (trackIdVsMovementStatus.size() > 0) {
-          movementResults = Collections
-              .unmodifiableList(trackIdVsMovementStatus);
+    List<Block> getMoveAttemptFinishedBlocks() {
+      List<Block> moveAttemptFinishedBlks = new ArrayList<>();
+      // 1. Adding all the completed block ids.
+      synchronized (blockIdVsMovementStatus) {
+        if (blockIdVsMovementStatus.size() > 0) {
+          moveAttemptFinishedBlks = Collections
+              .unmodifiableList(blockIdVsMovementStatus);
         }
       }
-      // 2. Adding the in progress track ids after those which are completed.
-      Set<Long> inProgressTrackIds = getInProgressTrackIds();
-      for (Long trackId : inProgressTrackIds) {
-        movementResults.add(new BlocksStorageMovementResult(trackId,
-            BlocksStorageMovementResult.Status.IN_PROGRESS));
-      }
-      return movementResults;
+      return moveAttemptFinishedBlks;
     }
 
     /**
-     * Remove the blocks storage movement results.
+     * Remove the storage movement attempt finished blocks from the tracking
+     * list.
      *
-     * @param results
-     *          set of blocks storage movement results
+     * @param moveAttemptFinishedBlks
+     *          set of storage movement attempt finished blocks
      */
-    void remove(BlocksStorageMovementResult[] results) {
-      if (results != null) {
-        synchronized (trackIdVsMovementStatus) {
-          for (BlocksStorageMovementResult blocksMovementResult : results) {
-            trackIdVsMovementStatus.remove(blocksMovementResult);
-          }
-        }
+    void remove(List<Block> moveAttemptFinishedBlks) {
+      if (moveAttemptFinishedBlks != null) {
+        blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks);
       }
     }
 
     /**
-     * Clear the trackID vs movement status tracking map.
+     * Clear the blockID vs movement status tracking map.
      */
     void removeAll() {
-      synchronized (trackIdVsMovementStatus) {
-        trackIdVsMovementStatus.clear();
+      synchronized (blockIdVsMovementStatus) {
+        blockIdVsMovementStatus.clear();
       }
     }
-
   }
 
   @VisibleForTesting
@@ -498,23 +455,4 @@ public class StoragePolicySatisfyWorker {
     movementTracker.removeAll();
     handler.removeAll();
   }
-
-  /**
-   * Gets list of trackids which are inprogress. Will do collection 
periodically
-   * on 'dfs.datanode.storage.policy.satisfier.worker.inprogress.recheck.time.
-   * millis' interval.
-   *
-   * @return collection of trackids which are inprogress
-   */
-  private Set<Long> getInProgressTrackIds() {
-    Set<Long> trackIds = new HashSet<>();
-    long now = monotonicNow();
-    if (nextInprogressRecheckTime >= now) {
-      trackIds = movementTracker.getInProgressTrackIds();
-
-      // schedule next re-check interval
-      nextInprogressRecheckTime = now + inprogressTrackIdsCheckInterval;
-    }
-    return trackIds;
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to