[46/50] [abbrv] hadoop git commit: HDFS-13165: [SPS]: Collects successfully moved block details via IBR. Contributed by Rakesh R.

2018-07-19 Thread rakeshr
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
index 7580ba9..f5225d2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
@@ -20,13 +20,10 @@ package org.apache.hadoop.hdfs.server.sps;
 
 import java.io.IOException;
 import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -39,7 +36,6 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
@@ -48,15 +44,14 @@ import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.balancer.KeyManager;
 import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
 import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
 import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
 import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
 import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
-import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
 import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler;
 import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
@@ -105,12 +100,14 @@ public class ExternalSPSBlockMoveTaskHandler implements 
BlockMoveTaskHandler {
 int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
 blkDispatcher = new BlockDispatcher(HdfsConstants.READ_TIMEOUT,
 ioFileBufferSize, connectToDnViaHostname);
+
+startMovementTracker();
   }
 
   /**
* Initializes block movement tracker daemon and starts the thread.
*/
-  public void init() {
+  private void startMovementTracker() {
 movementTrackerThread = new Daemon(this.blkMovementTracker);
 movementTrackerThread.setName("BlockStorageMovementTracker");
 movementTrackerThread.start();
@@ -156,24 +153,16 @@ public class ExternalSPSBlockMoveTaskHandler implements 
BlockMoveTaskHandler {
 // dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
 LOG.debug("Received BlockMovingTask {}", blkMovingInfo);
 BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo);
-Future moveCallable = mCompletionServ
-.submit(blockMovingTask);
-blkMovementTracker.addBlock(blkMovingInfo.getBlock(), moveCallable);
+mCompletionServ.submit(blockMovingTask);
   }
 
   private class ExternalBlocksMovementsStatusHandler
-  extends BlocksMovementsStatusHandler {
+  implements BlocksMovementsStatusHandler {
 @Override
-public void handle(
-List moveAttemptFinishedBlks) {
-  List blocks = new ArrayList<>();
-  for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
-blocks.add(item.getBlock());
-  }
-  BlocksStorageMoveAttemptFinished blkAttempted =
-  new BlocksStorageMoveAttemptFinished(
-  blocks.toArray(new Block[blocks.size()]));
-  service.notifyStorageMovementAttemptFinishedBlks(blkAttempted);
+public void handle(BlockMovementAttemptFinished attemptedMove) {
+  service.notifyStorageMovementAttemptFinishedBlk(
+  attemptedMove.getTargetDatanode(), attemptedMove.getTargetType(),
+  attemptedMove.getBlock());
 }
   }
 
@@ -194,6 +183,7 @@ public class 

[46/50] [abbrv] hadoop git commit: HDFS-13165: [SPS]: Collects successfully moved block details via IBR. Contributed by Rakesh R.

2018-07-16 Thread rakeshr
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4462030/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
index 7580ba9..f5225d2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
@@ -20,13 +20,10 @@ package org.apache.hadoop.hdfs.server.sps;
 
 import java.io.IOException;
 import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -39,7 +36,6 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
@@ -48,15 +44,14 @@ import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.balancer.KeyManager;
 import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
 import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
 import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
 import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
 import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
-import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
 import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler;
 import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
@@ -105,12 +100,14 @@ public class ExternalSPSBlockMoveTaskHandler implements 
BlockMoveTaskHandler {
 int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
 blkDispatcher = new BlockDispatcher(HdfsConstants.READ_TIMEOUT,
 ioFileBufferSize, connectToDnViaHostname);
+
+startMovementTracker();
   }
 
   /**
* Initializes block movement tracker daemon and starts the thread.
*/
-  public void init() {
+  private void startMovementTracker() {
 movementTrackerThread = new Daemon(this.blkMovementTracker);
 movementTrackerThread.setName("BlockStorageMovementTracker");
 movementTrackerThread.start();
@@ -156,24 +153,16 @@ public class ExternalSPSBlockMoveTaskHandler implements 
BlockMoveTaskHandler {
 // dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
 LOG.debug("Received BlockMovingTask {}", blkMovingInfo);
 BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo);
-Future moveCallable = mCompletionServ
-.submit(blockMovingTask);
-blkMovementTracker.addBlock(blkMovingInfo.getBlock(), moveCallable);
+mCompletionServ.submit(blockMovingTask);
   }
 
   private class ExternalBlocksMovementsStatusHandler
-  extends BlocksMovementsStatusHandler {
+  implements BlocksMovementsStatusHandler {
 @Override
-public void handle(
-List moveAttemptFinishedBlks) {
-  List blocks = new ArrayList<>();
-  for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
-blocks.add(item.getBlock());
-  }
-  BlocksStorageMoveAttemptFinished blkAttempted =
-  new BlocksStorageMoveAttemptFinished(
-  blocks.toArray(new Block[blocks.size()]));
-  service.notifyStorageMovementAttemptFinishedBlks(blkAttempted);
+public void handle(BlockMovementAttemptFinished attemptedMove) {
+  service.notifyStorageMovementAttemptFinishedBlk(
+  attemptedMove.getTargetDatanode(), attemptedMove.getTargetType(),
+  attemptedMove.getBlock());
 }
   }
 
@@ -194,6 +183,7 @@ public class 

[46/50] [abbrv] hadoop git commit: HDFS-13165: [SPS]: Collects successfully moved block details via IBR. Contributed by Rakesh R.

2018-07-12 Thread rakeshr
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02ebf5d4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
index 7580ba9..f5225d2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
@@ -20,13 +20,10 @@ package org.apache.hadoop.hdfs.server.sps;
 
 import java.io.IOException;
 import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -39,7 +36,6 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
@@ -48,15 +44,14 @@ import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.balancer.KeyManager;
 import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
 import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
 import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
 import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
 import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
-import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
 import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler;
 import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
@@ -105,12 +100,14 @@ public class ExternalSPSBlockMoveTaskHandler implements 
BlockMoveTaskHandler {
 int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
 blkDispatcher = new BlockDispatcher(HdfsConstants.READ_TIMEOUT,
 ioFileBufferSize, connectToDnViaHostname);
+
+startMovementTracker();
   }
 
   /**
* Initializes block movement tracker daemon and starts the thread.
*/
-  public void init() {
+  private void startMovementTracker() {
 movementTrackerThread = new Daemon(this.blkMovementTracker);
 movementTrackerThread.setName("BlockStorageMovementTracker");
 movementTrackerThread.start();
@@ -156,24 +153,16 @@ public class ExternalSPSBlockMoveTaskHandler implements 
BlockMoveTaskHandler {
 // dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
 LOG.debug("Received BlockMovingTask {}", blkMovingInfo);
 BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo);
-Future moveCallable = mCompletionServ
-.submit(blockMovingTask);
-blkMovementTracker.addBlock(blkMovingInfo.getBlock(), moveCallable);
+mCompletionServ.submit(blockMovingTask);
   }
 
   private class ExternalBlocksMovementsStatusHandler
-  extends BlocksMovementsStatusHandler {
+  implements BlocksMovementsStatusHandler {
 @Override
-public void handle(
-List moveAttemptFinishedBlks) {
-  List blocks = new ArrayList<>();
-  for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
-blocks.add(item.getBlock());
-  }
-  BlocksStorageMoveAttemptFinished blkAttempted =
-  new BlocksStorageMoveAttemptFinished(
-  blocks.toArray(new Block[blocks.size()]));
-  service.notifyStorageMovementAttemptFinishedBlks(blkAttempted);
+public void handle(BlockMovementAttemptFinished attemptedMove) {
+  service.notifyStorageMovementAttemptFinishedBlk(
+  attemptedMove.getTargetDatanode(), attemptedMove.getTargetType(),
+  attemptedMove.getBlock());
 }
   }
 
@@ -194,6 +183,7 @@ public class