[46/50] [abbrv] hadoop git commit: HDFS-13165: [SPS]: Collects successfully moved block details via IBR. Contributed by Rakesh R.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java index 7580ba9..f5225d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java @@ -20,13 +20,10 @@ package org.apache.hadoop.hdfs.server.sps; import java.io.IOException; import java.net.Socket; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -39,7 +36,6 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; @@ -48,15 +44,14 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.balancer.KeyManager; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; +import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher; import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished; import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus; import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker; import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler; -import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher; import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler; import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; @@ -105,12 +100,14 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); blkDispatcher = new BlockDispatcher(HdfsConstants.READ_TIMEOUT, ioFileBufferSize, connectToDnViaHostname); + +startMovementTracker(); } /** * Initializes block movement tracker daemon and starts the thread. */ - public void init() { + private void startMovementTracker() { movementTrackerThread = new Daemon(this.blkMovementTracker); movementTrackerThread.setName("BlockStorageMovementTracker"); movementTrackerThread.start(); @@ -156,24 +153,16 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { // dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType()); LOG.debug("Received BlockMovingTask {}", blkMovingInfo); BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo); -Future moveCallable = mCompletionServ -.submit(blockMovingTask); -blkMovementTracker.addBlock(blkMovingInfo.getBlock(), moveCallable); +mCompletionServ.submit(blockMovingTask); } private class ExternalBlocksMovementsStatusHandler - extends BlocksMovementsStatusHandler { + implements BlocksMovementsStatusHandler { @Override -public void handle( -List moveAttemptFinishedBlks) { - List blocks = new ArrayList<>(); - for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) { -blocks.add(item.getBlock()); - } - BlocksStorageMoveAttemptFinished blkAttempted = - new BlocksStorageMoveAttemptFinished( - blocks.toArray(new Block[blocks.size()])); - service.notifyStorageMovementAttemptFinishedBlks(blkAttempted); +public void handle(BlockMovementAttemptFinished attemptedMove) { + service.notifyStorageMovementAttemptFinishedBlk( + attemptedMove.getTargetDatanode(), attemptedMove.getTargetType(), + attemptedMove.getBlock()); } } @@ -194,6 +183,7 @@ public class
[46/50] [abbrv] hadoop git commit: HDFS-13165: [SPS]: Collects successfully moved block details via IBR. Contributed by Rakesh R.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4462030/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java index 7580ba9..f5225d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java @@ -20,13 +20,10 @@ package org.apache.hadoop.hdfs.server.sps; import java.io.IOException; import java.net.Socket; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -39,7 +36,6 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; @@ -48,15 +44,14 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.balancer.KeyManager; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; +import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher; import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished; import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus; import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker; import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler; -import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher; import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler; import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; @@ -105,12 +100,14 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); blkDispatcher = new BlockDispatcher(HdfsConstants.READ_TIMEOUT, ioFileBufferSize, connectToDnViaHostname); + +startMovementTracker(); } /** * Initializes block movement tracker daemon and starts the thread. */ - public void init() { + private void startMovementTracker() { movementTrackerThread = new Daemon(this.blkMovementTracker); movementTrackerThread.setName("BlockStorageMovementTracker"); movementTrackerThread.start(); @@ -156,24 +153,16 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { // dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType()); LOG.debug("Received BlockMovingTask {}", blkMovingInfo); BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo); -Future moveCallable = mCompletionServ -.submit(blockMovingTask); -blkMovementTracker.addBlock(blkMovingInfo.getBlock(), moveCallable); +mCompletionServ.submit(blockMovingTask); } private class ExternalBlocksMovementsStatusHandler - extends BlocksMovementsStatusHandler { + implements BlocksMovementsStatusHandler { @Override -public void handle( -List moveAttemptFinishedBlks) { - List blocks = new ArrayList<>(); - for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) { -blocks.add(item.getBlock()); - } - BlocksStorageMoveAttemptFinished blkAttempted = - new BlocksStorageMoveAttemptFinished( - blocks.toArray(new Block[blocks.size()])); - service.notifyStorageMovementAttemptFinishedBlks(blkAttempted); +public void handle(BlockMovementAttemptFinished attemptedMove) { + service.notifyStorageMovementAttemptFinishedBlk( + attemptedMove.getTargetDatanode(), attemptedMove.getTargetType(), + attemptedMove.getBlock()); } } @@ -194,6 +183,7 @@ public class
[46/50] [abbrv] hadoop git commit: HDFS-13165: [SPS]: Collects successfully moved block details via IBR. Contributed by Rakesh R.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02ebf5d4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java index 7580ba9..f5225d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java @@ -20,13 +20,10 @@ package org.apache.hadoop.hdfs.server.sps; import java.io.IOException; import java.net.Socket; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -39,7 +36,6 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; @@ -48,15 +44,14 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.balancer.KeyManager; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; +import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher; import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished; import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus; import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker; import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler; -import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher; import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler; import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; @@ -105,12 +100,14 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); blkDispatcher = new BlockDispatcher(HdfsConstants.READ_TIMEOUT, ioFileBufferSize, connectToDnViaHostname); + +startMovementTracker(); } /** * Initializes block movement tracker daemon and starts the thread. */ - public void init() { + private void startMovementTracker() { movementTrackerThread = new Daemon(this.blkMovementTracker); movementTrackerThread.setName("BlockStorageMovementTracker"); movementTrackerThread.start(); @@ -156,24 +153,16 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { // dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType()); LOG.debug("Received BlockMovingTask {}", blkMovingInfo); BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo); -Future moveCallable = mCompletionServ -.submit(blockMovingTask); -blkMovementTracker.addBlock(blkMovingInfo.getBlock(), moveCallable); +mCompletionServ.submit(blockMovingTask); } private class ExternalBlocksMovementsStatusHandler - extends BlocksMovementsStatusHandler { + implements BlocksMovementsStatusHandler { @Override -public void handle( -List moveAttemptFinishedBlks) { - List blocks = new ArrayList<>(); - for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) { -blocks.add(item.getBlock()); - } - BlocksStorageMoveAttemptFinished blkAttempted = - new BlocksStorageMoveAttemptFinished( - blocks.toArray(new Block[blocks.size()])); - service.notifyStorageMovementAttemptFinishedBlks(blkAttempted); +public void handle(BlockMovementAttemptFinished attemptedMove) { + service.notifyStorageMovementAttemptFinishedBlk( + attemptedMove.getTargetDatanode(), attemptedMove.getTargetType(), + attemptedMove.getBlock()); } } @@ -194,6 +183,7 @@ public class