http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java index 7580ba9..f5225d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java @@ -20,13 +20,10 @@ package org.apache.hadoop.hdfs.server.sps; import java.io.IOException; import java.net.Socket; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -39,7 +36,6 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; @@ -48,15 +44,14 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.balancer.KeyManager; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; +import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher; import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished; import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus; import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker; import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler; -import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher; import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler; import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; @@ -105,12 +100,14 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); blkDispatcher = new BlockDispatcher(HdfsConstants.READ_TIMEOUT, ioFileBufferSize, connectToDnViaHostname); + + startMovementTracker(); } /** * Initializes block movement tracker daemon and starts the thread. */ - public void init() { + private void startMovementTracker() { movementTrackerThread = new Daemon(this.blkMovementTracker); movementTrackerThread.setName("BlockStorageMovementTracker"); movementTrackerThread.start(); @@ -156,24 +153,16 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { // dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType()); LOG.debug("Received BlockMovingTask {}", blkMovingInfo); BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo); - Future<BlockMovementAttemptFinished> moveCallable = mCompletionServ - .submit(blockMovingTask); - blkMovementTracker.addBlock(blkMovingInfo.getBlock(), moveCallable); + mCompletionServ.submit(blockMovingTask); } private class ExternalBlocksMovementsStatusHandler - extends BlocksMovementsStatusHandler { + implements BlocksMovementsStatusHandler { @Override - public void handle( - List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) { - List<Block> blocks = new ArrayList<>(); - for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) { - blocks.add(item.getBlock()); - } - BlocksStorageMoveAttemptFinished blkAttempted = - new BlocksStorageMoveAttemptFinished( - blocks.toArray(new Block[blocks.size()])); - service.notifyStorageMovementAttemptFinishedBlks(blkAttempted); + public void handle(BlockMovementAttemptFinished attemptedMove) { + service.notifyStorageMovementAttemptFinishedBlk( + attemptedMove.getTargetDatanode(), attemptedMove.getTargetType(), + attemptedMove.getBlock()); } } @@ -194,6 +183,7 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { BlockMovementStatus blkMovementStatus = moveBlock(); return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(), blkMovingInfo.getSource(), blkMovingInfo.getTarget(), + blkMovingInfo.getTargetStorageType(), blkMovementStatus); }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java index 6fc35ea..236b887 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java @@ -86,7 +86,6 @@ public final class ExternalStoragePolicySatisfier { new ExternalBlockMovementListener(); ExternalSPSBlockMoveTaskHandler externalHandler = new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps); - externalHandler.init(); sps.init(context, new ExternalSPSFilePathCollector(sps), externalHandler, blkMoveListener); sps.start(true, StoragePolicySatisfierMode.EXTERNAL); @@ -147,7 +146,7 @@ public final class ExternalStoragePolicySatisfier { for (Block block : moveAttemptFinishedBlks) { actualBlockMovements.add(block); } - LOG.info("Movement attempted blocks", actualBlockMovements); + LOG.info("Movement attempted blocks:{}", actualBlockMovements); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 7c35494..baf7ec7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -185,14 +185,6 @@ message BlockMovingInfoProto { } /** - * Blocks for which storage movements has been attempted and finished - * with either success or failure. - */ -message BlocksStorageMoveAttemptFinishedProto { - repeated BlockProto blocks = 1; -} - -/** * registration - Information of the datanode registering with the namenode */ message RegisterDatanodeRequestProto { @@ -249,7 +241,6 @@ message HeartbeatRequestProto { optional bool requestFullBlockReportLease = 9 [ default = false ]; repeated SlowPeerReportProto slowPeers = 10; repeated SlowDiskReportProto slowDisks = 11; - optional BlocksStorageMoveAttemptFinishedProto storageMoveAttemptFinishedBlks = 12; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 0f80f97..0b533c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4592,6 +4592,47 @@ </property> <property> + <name>dfs.storage.policy.satisfier.max.outstanding.paths</name> + <value>10000</value> + <description> + Defines the maximum number of paths to satisfy that can be queued up in the + Satisfier call queue in a period of time. Default value is 10000. + </description> +</property> + +<property> + <name>dfs.storage.policy.satisfier.address</name> + <value>0.0.0.0:0</value> + <description> + The hostname used for a keytab based Kerberos login. Keytab based login + is required when dfs.storage.policy.satisfier.mode is external. + </description> +</property> + +<property> + <name>dfs.storage.policy.satisfier.keytab.file</name> + <value></value> + <description> + The keytab file used by external StoragePolicySatisfier to login as its + service principal. The principal name is configured with + dfs.storage.policy.satisfier.kerberos.principal. Keytab based login + is required when dfs.storage.policy.satisfier.mode is external. + </description> +</property> + +<property> + <name>dfs.storage.policy.satisfier.kerberos.principal</name> + <value></value> + <description> + The StoragePolicySatisfier principal. This is typically set to + satisfier/_h...@realm.tld. The StoragePolicySatisfier will substitute + _HOST with its own fully qualified hostname at startup. The _HOST placeholder + allows using the same configuration setting on different servers. Keytab + based login is required when dfs.storage.policy.satisfier.mode is external. + </description> +</property> + +<property> <name>dfs.pipeline.ecn</name> <value>false</value> <description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index f247370..05b6d30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; @@ -117,8 +116,7 @@ public class TestNameNodePrunesMissingStorages { cluster.stopDataNode(0); cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0, 0, null, true, SlowPeerReports.EMPTY_REPORT, - SlowDiskReports.EMPTY_REPORT, - new BlocksStorageMoveAttemptFinished(null)); + SlowDiskReports.EMPTY_REPORT); // Check that the missing storage was pruned. assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java index d13d717..b453991 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; @@ -168,8 +167,7 @@ public class InternalDataNodeTestUtils { Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class), - Mockito.any(BlocksStorageMoveAttemptFinished.class))).thenReturn( + Mockito.any(SlowDiskReports.class))).thenReturn( new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat( HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current() .nextLong() | 1L)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimpleBlocksMovementsStatusHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimpleBlocksMovementsStatusHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimpleBlocksMovementsStatusHandler.java new file mode 100644 index 0000000..b361ce5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimpleBlocksMovementsStatusHandler.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished; +import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler; + +/** + * Blocks movements status handler, which is used to collect details of the + * completed block movements and later these attempted finished(with success or + * failure) blocks can be accessed to notify respective listeners, if any. + */ +public class SimpleBlocksMovementsStatusHandler + implements BlocksMovementsStatusHandler { + private final List<Block> blockIdVsMovementStatus = new ArrayList<>(); + + /** + * Collect all the storage movement attempt finished blocks. Later this will + * be send to namenode via heart beat. + * + * @param moveAttemptFinishedBlk + * storage movement attempt finished block + */ + public void handle(BlockMovementAttemptFinished moveAttemptFinishedBlk) { + // Adding to the tracking report list. Later this can be accessed to know + // the attempted block movements. + synchronized (blockIdVsMovementStatus) { + blockIdVsMovementStatus.add(moveAttemptFinishedBlk.getBlock()); + } + } + + /** + * @return unmodifiable list of storage movement attempt finished blocks. + */ + public List<Block> getMoveAttemptFinishedBlocks() { + List<Block> moveAttemptFinishedBlks = new ArrayList<>(); + // 1. Adding all the completed block ids. + synchronized (blockIdVsMovementStatus) { + if (blockIdVsMovementStatus.size() > 0) { + moveAttemptFinishedBlks = Collections + .unmodifiableList(blockIdVsMovementStatus); + } + } + return moveAttemptFinishedBlks; + } + + /** + * Remove the storage movement attempt finished blocks from the tracking list. + * + * @param moveAttemptFinishedBlks + * set of storage movement attempt finished blocks + */ + public void remove(List<Block> moveAttemptFinishedBlks) { + if (moveAttemptFinishedBlks != null) { + blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks); + } + } + + /** + * Clear the blockID vs movement status tracking map. + */ + public void removeAll() { + synchronized (blockIdVsMovementStatus) { + blockIdVsMovementStatus.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 0fa1696..d0c3a83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -124,8 +123,8 @@ public class TestBPOfferService { Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf(); Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")) .when(mockDn).getMetrics(); - Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn) - .getStoragePolicySatisfyWorker(); + Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn, null)) + .when(mockDn).getStoragePolicySatisfyWorker(); // Set up a simulated dataset with our fake BP mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf)); @@ -160,8 +159,7 @@ public class TestBPOfferService { Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class), - Mockito.any(BlocksStorageMoveAttemptFinished.class)); + Mockito.any(SlowDiskReports.class)); mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0); datanodeCommands[nnIdx] = new DatanodeCommand[0]; return mock; @@ -380,8 +378,8 @@ public class TestBPOfferService { Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf(); Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")). when(mockDn).getMetrics(); - Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn) - .getStoragePolicySatisfyWorker(); + Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn, null)) + .when(mockDn).getStoragePolicySatisfyWorker(); final AtomicInteger count = new AtomicInteger(); Mockito.doAnswer(new Answer<Void>() { @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 052eb87..07fd4ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -93,7 +93,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -233,8 +232,7 @@ public class TestBlockRecovery { Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class), - Mockito.any(BlocksStorageMoveAttemptFinished.class))) + Mockito.any(SlowDiskReports.class))) .thenReturn(new HeartbeatResponse( new DatanodeCommand[0], new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1), http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java index 0dd15c3..28427bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; @@ -173,8 +172,7 @@ public class TestDataNodeLifeline { any(VolumeFailureSummary.class), anyBoolean(), any(SlowPeerReports.class), - any(SlowDiskReports.class), - any(BlocksStorageMoveAttemptFinished.class)); + any(SlowDiskReports.class)); // Intercept lifeline to trigger latch count-down on each call. doAnswer(new LatchCountingAnswer<Void>(lifelinesSent)) @@ -239,8 +237,7 @@ public class TestDataNodeLifeline { any(VolumeFailureSummary.class), anyBoolean(), any(SlowPeerReports.class), - any(SlowDiskReports.class), - any(BlocksStorageMoveAttemptFinished.class)); + any(SlowDiskReports.class)); // While waiting on the latch for the expected number of heartbeat messages, // poll DataNode tracking information. We expect that the DataNode always http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java index d47da69..bb1d9ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; @@ -223,8 +222,7 @@ public class TestDatanodeProtocolRetryPolicy { Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class), - Mockito.any(BlocksStorageMoveAttemptFinished.class)); + Mockito.any(SlowDiskReports.class)); dn = new DataNode(conf, locations, null, null) { @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index 3732b2e..2dbd5b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -66,7 +66,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.Page import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -209,8 +208,7 @@ public class TestFsDatasetCache { (StorageReport[]) any(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(), anyBoolean(), any(SlowPeerReports.class), - any(SlowDiskReports.class), - any(BlocksStorageMoveAttemptFinished.class)); + any(SlowDiskReports.class)); } finally { lock.writeLock().unlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java index 06a66f7..51d3254 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -35,8 +33,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; @@ -173,8 +171,10 @@ public class TestStoragePolicySatisfyWorker { DatanodeInfo targetDnInfo = DFSTestUtil .getLocalDatanodeInfo(src.getXferPort()); + SimpleBlocksMovementsStatusHandler handler = + new SimpleBlocksMovementsStatusHandler(); StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf, - src); + src, handler); try { worker.start(); List<BlockMovingInfo> blockMovingInfos = new ArrayList<>(); @@ -184,81 +184,19 @@ public class TestStoragePolicySatisfyWorker { blockMovingInfos.add(blockMovingInfo); worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(), blockMovingInfos); - - waitForBlockMovementCompletion(worker, 1, 30000); + waitForBlockMovementCompletion(handler, 1, 30000); } finally { worker.stop(); } } - /** - * Tests that drop SPS work method clears all the queues. - * - * @throws Exception - */ - @Test(timeout = 120000) - public void testDropSPSWork() throws Exception { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(20).build(); - - cluster.waitActive(); - final DistributedFileSystem dfs = cluster.getFileSystem(); - final String file = "/testDropSPSWork"; - DFSTestUtil.createFile(dfs, new Path(file), false, 1024, 50 * 100, - DEFAULT_BLOCK_SIZE, (short) 2, 0, false, null); - - // move to ARCHIVE - dfs.setStoragePolicy(new Path(file), "COLD"); - - DataNode src = cluster.getDataNodes().get(2); - DatanodeInfo targetDnInfo = - DFSTestUtil.getLocalDatanodeInfo(src.getXferPort()); - - StoragePolicySatisfyWorker worker = - new StoragePolicySatisfyWorker(conf, src); - worker.start(); - try { - List<BlockMovingInfo> blockMovingInfos = new ArrayList<>(); - List<LocatedBlock> locatedBlocks = - dfs.getClient().getLocatedBlocks(file, 0).getLocatedBlocks(); - for (LocatedBlock locatedBlock : locatedBlocks) { - BlockMovingInfo blockMovingInfo = - prepareBlockMovingInfo(locatedBlock.getBlock().getLocalBlock(), - locatedBlock.getLocations()[0], targetDnInfo, - locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE); - blockMovingInfos.add(blockMovingInfo); - } - worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(), - blockMovingInfos); - // Wait till results queue build up - waitForBlockMovementResult(worker, 30000); - worker.dropSPSWork(); - assertTrue(worker.getBlocksMovementsStatusHandler() - .getMoveAttemptFinishedBlocks().size() == 0); - } finally { - worker.stop(); - } - } - - private void waitForBlockMovementResult( - final StoragePolicySatisfyWorker worker, int timeout) throws Exception { - GenericTestUtils.waitFor(new Supplier<Boolean>() { - @Override - public Boolean get() { - List<Block> completedBlocks = worker.getBlocksMovementsStatusHandler() - .getMoveAttemptFinishedBlocks(); - return completedBlocks.size() > 0; - } - }, 100, timeout); - } - private void waitForBlockMovementCompletion( - final StoragePolicySatisfyWorker worker, + final SimpleBlocksMovementsStatusHandler handler, int expectedFinishedItemsCount, int timeout) throws Exception { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { - List<Block> completedBlocks = worker.getBlocksMovementsStatusHandler() - .getMoveAttemptFinishedBlocks(); + List<Block> completedBlocks = handler.getMoveAttemptFinishedBlocks(); int finishedCount = completedBlocks.size(); LOG.info("Block movement completed count={}, expected={} and actual={}", completedBlocks.size(), expectedFinishedItemsCount, finishedCount); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java index 20402f2..5f62ddb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; @@ -111,8 +110,7 @@ public class TestStorageReport { anyLong(), anyLong(), anyInt(), anyInt(), anyInt(), Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class), - Mockito.any(BlocksStorageMoveAttemptFinished.class)); + Mockito.any(SlowDiskReports.class)); StorageReport[] reports = captor.getValue(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index ec00ae7..3a3c471 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -56,7 +56,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -957,8 +956,8 @@ public class NNThroughputBenchmark implements Tool { DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) }; DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, 0L, 0L, 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, - new BlocksStorageMoveAttemptFinished(null)).getCommands(); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT) + .getCommands(); if(cmds != null) { for (DatanodeCommand cmd : cmds ) { if(LOG.isDebugEnabled()) { @@ -1008,8 +1007,8 @@ public class NNThroughputBenchmark implements Tool { false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) }; DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, 0L, 0L, 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, - new BlocksStorageMoveAttemptFinished(null)).getCommands(); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT) + .getCommands(); if (cmds != null) { for (DatanodeCommand cmd : cmds) { if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index 899bb82..b85527a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; @@ -131,8 +130,7 @@ public class NameNodeAdapter { return namesystem.handleHeartbeat(nodeReg, BlockManagerTestUtil.getStorageReportsForDatanode(dd), dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, - new BlocksStorageMoveAttemptFinished(null)); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); } public static boolean setReplication(final FSNamesystem ns, http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index 65628b9..df74107 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -140,8 +139,8 @@ public class TestDeadDatanode { false, 0, 0, 0, 0, 0) }; DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, - new BlocksStorageMoveAttemptFinished(null)).getCommands(); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT) + .getCommands(); assertEquals(1, cmd.length); assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER .getAction()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java index 47ea39f..ee0b2e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.test.GenericTestUtils; @@ -250,10 +251,9 @@ public class TestNameNodeReconfigure { StoragePolicySatisfierMode.INTERNAL.toString()); // Since DFS_STORAGE_POLICY_ENABLED_KEY is disabled, SPS can't be enabled. - assertEquals("SPS shouldn't start as " - + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY + " is disabled", false, - nameNode.getNamesystem().getBlockManager().getSPSManager() - .isInternalSatisfierRunning()); + assertNull("SPS shouldn't start as " + + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY + " is disabled", + nameNode.getNamesystem().getBlockManager().getSPSManager()); verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, StoragePolicySatisfierMode.INTERNAL, false); @@ -352,9 +352,12 @@ public class TestNameNodeReconfigure { void verifySPSEnabled(final NameNode nameNode, String property, StoragePolicySatisfierMode expected, boolean isSatisfierRunning) { - assertEquals(property + " has wrong value", isSatisfierRunning, nameNode - .getNamesystem().getBlockManager().getSPSManager() - .isInternalSatisfierRunning()); + StoragePolicySatisfyManager spsMgr = nameNode + .getNamesystem().getBlockManager().getSPSManager(); + boolean isInternalSatisfierRunning = spsMgr != null + ? spsMgr.isInternalSatisfierRunning() : false; + assertEquals(property + " has wrong value", isSatisfierRunning, + isInternalSatisfierRunning); String actual = nameNode.getConf().get(property, DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT); assertEquals(property + " has wrong value", expected, http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java index 29af885..ed1fe92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java @@ -22,13 +22,18 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.StorageTypeNodePair; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -92,14 +97,16 @@ public class TestBlockStorageMovementAttemptedItems { */ @Test(timeout = 30000) public void testAddReportedMoveAttemptFinishedBlocks() throws Exception { - bsmAttemptedItems.start(); // start block movement result monitor thread Long item = new Long(1234); - List<Block> blocks = new ArrayList<Block>(); - blocks.add(new Block(item)); - bsmAttemptedItems.add(new AttemptedItemInfo<Long>(0L, 0L, 0L, blocks, 0)); - Block[] blockArray = new Block[blocks.size()]; - blocks.toArray(blockArray); - bsmAttemptedItems.notifyMovementTriedBlocks(blockArray); + Block block = new Block(item); + DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867); + Set<StorageTypeNodePair> locs = new HashSet<>(); + locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo)); + Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>(); + blocksMap.put(block, locs); + bsmAttemptedItems.add(0L, 0L, 0L, blocksMap, 0); + bsmAttemptedItems.notifyReportedBlock(dnInfo, StorageType.ARCHIVE, + block); assertEquals("Failed to receive result!", 1, bsmAttemptedItems.getMovementFinishedBlocksCount()); } @@ -111,9 +118,13 @@ public class TestBlockStorageMovementAttemptedItems { public void testNoBlockMovementAttemptFinishedReportAdded() throws Exception { bsmAttemptedItems.start(); // start block movement report monitor thread Long item = new Long(1234); - List<Block> blocks = new ArrayList<>(); - blocks.add(new Block(item)); - bsmAttemptedItems.add(new AttemptedItemInfo<Long>(0L, 0L, 0L, blocks, 0)); + Block block = new Block(item); + DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867); + Set<StorageTypeNodePair> locs = new HashSet<>(); + locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo)); + Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>(); + blocksMap.put(block, locs); + bsmAttemptedItems.add(0L, 0L, 0L, blocksMap, 0); assertEquals("Shouldn't receive result", 0, bsmAttemptedItems.getMovementFinishedBlocksCount()); assertEquals("Item doesn't exist in the attempted list", 1, @@ -129,15 +140,18 @@ public class TestBlockStorageMovementAttemptedItems { @Test(timeout = 30000) public void testPartialBlockMovementShouldBeRetried1() throws Exception { Long item = new Long(1234); - List<Block> blocks = new ArrayList<>(); - blocks.add(new Block(item)); - blocks.add(new Block(5678L)); + Block block1 = new Block(item); + Block block2 = new Block(5678L); Long trackID = 0L; - bsmAttemptedItems - .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0)); - Block[] blksMovementReport = new Block[1]; - blksMovementReport[0] = new Block(item); - bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport); + DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867); + Set<StorageTypeNodePair> locs = new HashSet<>(); + locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo)); + Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>(); + blocksMap.put(block1, locs); + blocksMap.put(block2, locs); + bsmAttemptedItems.add(trackID, trackID, 0L, blocksMap, 0); + bsmAttemptedItems.notifyReportedBlock(dnInfo, StorageType.ARCHIVE, + block1); // start block movement report monitor thread bsmAttemptedItems.start(); @@ -155,14 +169,16 @@ public class TestBlockStorageMovementAttemptedItems { @Test(timeout = 30000) public void testPartialBlockMovementShouldBeRetried2() throws Exception { Long item = new Long(1234); + Block block = new Block(item); Long trackID = 0L; - List<Block> blocks = new ArrayList<>(); - blocks.add(new Block(item)); - bsmAttemptedItems - .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0)); - Block[] blksMovementReport = new Block[1]; - blksMovementReport[0] = new Block(item); - bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport); + DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867); + Set<StorageTypeNodePair> locs = new HashSet<>(); + locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo)); + Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>(); + blocksMap.put(block, locs); + bsmAttemptedItems.add(trackID, trackID, 0L, blocksMap, 0); + bsmAttemptedItems.notifyReportedBlock(dnInfo, StorageType.ARCHIVE, + block); Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out @@ -183,14 +199,16 @@ public class TestBlockStorageMovementAttemptedItems { public void testPartialBlockMovementWithEmptyAttemptedQueue() throws Exception { Long item = new Long(1234); + Block block = new Block(item); Long trackID = 0L; - List<Block> blocks = new ArrayList<>(); - blocks.add(new Block(item)); - bsmAttemptedItems - .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0)); - Block[] blksMovementReport = new Block[1]; - blksMovementReport[0] = new Block(item); - bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport); + DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867); + Set<StorageTypeNodePair> locs = new HashSet<>(); + locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo)); + Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>(); + blocksMap.put(block, locs); + bsmAttemptedItems.add(trackID, trackID, 0L, blocksMap, 0); + bsmAttemptedItems.notifyReportedBlock(dnInfo, StorageType.ARCHIVE, + block); assertFalse( "Should not add in queue again if it is not there in" + " storageMovementAttemptedItems", http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java index 75aeb86..b05717a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.client.HdfsAdmin; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -107,6 +108,8 @@ public class TestStoragePolicySatisfier { public static final long CAPACITY = 2 * 256 * 1024 * 1024; public static final String FILE = "/testMoveToSatisfyStoragePolicy"; public static final int DEFAULT_BLOCK_SIZE = 1024; + private ExternalBlockMovementListener blkMoveListener = + new ExternalBlockMovementListener(); /** * Sets hdfs cluster. @@ -1029,6 +1032,9 @@ public class TestStoragePolicySatisfier { config.set(DFSConfigKeys .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, "3000"); + config.set(DFSConfigKeys + .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY, + "5000"); StorageType[][] newtypes = new StorageType[][] { {StorageType.ARCHIVE, StorageType.DISK}, {StorageType.ARCHIVE, StorageType.DISK}, @@ -1072,6 +1078,9 @@ public class TestStoragePolicySatisfier { config.set(DFSConfigKeys .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, "3000"); + config.set(DFSConfigKeys + .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY, + "5000"); StorageType[][] newtypes = new StorageType[][] { {StorageType.ARCHIVE, StorageType.DISK}, {StorageType.ARCHIVE, StorageType.DISK}, @@ -1089,7 +1098,7 @@ public class TestStoragePolicySatisfier { fs.setStoragePolicy(filePath, "COLD"); fs.satisfyStoragePolicy(filePath); DFSTestUtil.waitExpectedStorageType(filePath.toString(), - StorageType.ARCHIVE, 3, 30000, hdfsCluster.getFileSystem()); + StorageType.ARCHIVE, 3, 60000, hdfsCluster.getFileSystem()); assertFalse("Log output does not contain expected log message: ", logs.getOutput().contains("some of the blocks are low redundant")); } finally { @@ -1425,6 +1434,9 @@ public class TestStoragePolicySatisfier { config.set(DFSConfigKeys .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, "3000"); + config.set(DFSConfigKeys + .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY, + "5000"); config.setBoolean(DFSConfigKeys .DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY, false); @@ -1467,7 +1479,7 @@ public class TestStoragePolicySatisfier { for (int i = 1; i <= 10; i++) { Path filePath = new Path("/file" + i); DFSTestUtil.waitExpectedStorageType(filePath.toString(), - StorageType.DISK, 4, 30000, hdfsCluster.getFileSystem()); + StorageType.DISK, 4, 60000, hdfsCluster.getFileSystem()); } for (int i = 11; i <= 20; i++) { Path filePath = new Path("/file" + i); @@ -1725,20 +1737,16 @@ public class TestStoragePolicySatisfier { public void waitForBlocksMovementAttemptReport( long expectedMovementFinishedBlocksCount, int timeout) throws TimeoutException, InterruptedException { - BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier<Long> sps = - (StoragePolicySatisfier<Long>) blockManager - .getSPSManager().getInternalSPSService(); + Assert.assertNotNull("Didn't set external block move listener", + blkMoveListener); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { + int actualCount = blkMoveListener.getActualBlockMovements().size(); LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", expectedMovementFinishedBlocksCount, - ((BlockStorageMovementAttemptedItems<Long>) (sps - .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount()); - return ((BlockStorageMovementAttemptedItems<Long>) (sps - .getAttemptedItemsMonitor())) - .getMovementFinishedBlocksCount() + actualCount); + return actualCount >= expectedMovementFinishedBlocksCount; } }, 100, timeout); @@ -1790,11 +1798,54 @@ public class TestStoragePolicySatisfier { .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn) .storageTypes(storageTypes).storageCapacities(capacities).build(); cluster.waitActive(); + + // Sets external listener for assertion. + blkMoveListener.clear(); + BlockManager blockManager = cluster.getNamesystem().getBlockManager(); + final StoragePolicySatisfier<Long> sps = + (StoragePolicySatisfier<Long>) blockManager + .getSPSManager().getInternalSPSService(); + sps.setBlockMovementListener(blkMoveListener); return cluster; } public void restartNamenode() throws IOException { hdfsCluster.restartNameNodes(); hdfsCluster.waitActive(); + BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); + StoragePolicySatisfyManager spsMgr = blockManager.getSPSManager(); + if (spsMgr != null && spsMgr.isInternalSatisfierRunning()) { + // Sets external listener for assertion. + blkMoveListener.clear(); + final StoragePolicySatisfier<Long> sps = + (StoragePolicySatisfier<Long>) spsMgr.getInternalSPSService(); + sps.setBlockMovementListener(blkMoveListener); + } + } + + /** + * Implementation of listener callback, where it collects all the sps move + * attempted blocks for assertion. + */ + public static final class ExternalBlockMovementListener + implements BlockMovementListener { + + private List<Block> actualBlockMovements = new ArrayList<>(); + + @Override + public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) { + for (Block block : moveAttemptFinishedBlks) { + actualBlockMovements.add(block); + } + LOG.info("Movement attempted blocks:{}", actualBlockMovements); + } + + public List<Block> getActualBlockMovements() { + return actualBlockMovements; + } + + public void clear() { + actualBlockMovements.clear(); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java index e69a833..857bd6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier.ExternalBlockMovementListener; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Before; @@ -70,6 +71,8 @@ public class TestStoragePolicySatisfierWithStripedFile { private int cellSize; private int defaultStripeBlockSize; private Configuration conf; + private ExternalBlockMovementListener blkMoveListener = + new ExternalBlockMovementListener(); private ErasureCodingPolicy getEcPolicy() { return StripedFileTestUtil.getDefaultECPolicy(); @@ -131,6 +134,15 @@ public class TestStoragePolicySatisfierWithStripedFile { HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf); try { cluster.waitActive(); + + // Sets external listener for assertion. + blkMoveListener.clear(); + BlockManager blockManager = cluster.getNamesystem().getBlockManager(); + final StoragePolicySatisfier<Long> sps = + (StoragePolicySatisfier<Long>) blockManager + .getSPSManager().getInternalSPSService(); + sps.setBlockMovementListener(blkMoveListener); + DistributedFileSystem dfs = cluster.getFileSystem(); dfs.enableErasureCodingPolicy( StripedFileTestUtil.getDefaultECPolicy().getName()); @@ -240,6 +252,15 @@ public class TestStoragePolicySatisfierWithStripedFile { HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf); try { cluster.waitActive(); + + // Sets external listener for assertion. + blkMoveListener.clear(); + BlockManager blockManager = cluster.getNamesystem().getBlockManager(); + final StoragePolicySatisfier<Long> sps = + (StoragePolicySatisfier<Long>) blockManager + .getSPSManager().getInternalSPSService(); + sps.setBlockMovementListener(blkMoveListener); + DistributedFileSystem dfs = cluster.getFileSystem(); dfs.enableErasureCodingPolicy( StripedFileTestUtil.getDefaultECPolicy().getName()); @@ -328,6 +349,9 @@ public class TestStoragePolicySatisfierWithStripedFile { conf.set(DFSConfigKeys .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, "3000"); + conf.set(DFSConfigKeys + .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY, + "5000"); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(numOfDatanodes) .storagesPerDatanode(storagesPerDatanode) @@ -559,22 +583,16 @@ public class TestStoragePolicySatisfierWithStripedFile { private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster, long expectedMoveFinishedBlks, int timeout) throws TimeoutException, InterruptedException { - BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier<Long> sps = - (StoragePolicySatisfier<Long>) blockManager - .getSPSManager().getInternalSPSService(); - Assert.assertNotNull("Failed to get SPS object reference!", sps); - + Assert.assertNotNull("Didn't set external block move listener", + blkMoveListener); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { + int actualCount = blkMoveListener.getActualBlockMovements().size(); LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", expectedMoveFinishedBlks, - ((BlockStorageMovementAttemptedItems<Long>) sps - .getAttemptedItemsMonitor()).getMovementFinishedBlocksCount()); - return ((BlockStorageMovementAttemptedItems<Long>) sps - .getAttemptedItemsMonitor()) - .getMovementFinishedBlocksCount() >= expectedMoveFinishedBlks; + actualCount); + return actualCount >= expectedMoveFinishedBlks; } }, 100, timeout); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e2f0189/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java index 28e172a..be243cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java @@ -54,11 +54,9 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener; import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier; @@ -92,6 +90,8 @@ public class TestExternalStoragePolicySatisfier private File baseDir; private StoragePolicySatisfier<String> externalSps; private ExternalSPSContext externalCtxt; + private ExternalBlockMovementListener blkMoveListener = + new ExternalBlockMovementListener(); @After public void destroy() throws Exception { @@ -144,15 +144,12 @@ public class TestExternalStoragePolicySatisfier nnc = getNameNodeConnector(getConf()); externalSps = new StoragePolicySatisfier<String>(getConf()); - externalCtxt = new ExternalSPSContext(externalSps, - getNameNodeConnector(conf)); + externalCtxt = new ExternalSPSContext(externalSps, nnc); - ExternalBlockMovementListener blkMoveListener = - new ExternalBlockMovementListener(); + blkMoveListener.clear(); ExternalSPSBlockMoveTaskHandler externalHandler = new ExternalSPSBlockMoveTaskHandler(conf, nnc, externalSps); - externalHandler.init(); externalSps.init(externalCtxt, new ExternalSPSFilePathCollector(externalSps), externalHandler, blkMoveListener); @@ -169,33 +166,17 @@ public class TestExternalStoragePolicySatisfier getCluster().waitActive(); externalSps = new StoragePolicySatisfier<>(getConf()); - externalCtxt = new ExternalSPSContext(externalSps, - getNameNodeConnector(getConf())); - ExternalBlockMovementListener blkMoveListener = - new ExternalBlockMovementListener(); + externalCtxt = new ExternalSPSContext(externalSps, nnc); + blkMoveListener.clear(); ExternalSPSBlockMoveTaskHandler externalHandler = new ExternalSPSBlockMoveTaskHandler(getConf(), nnc, externalSps); - externalHandler.init(); externalSps.init(externalCtxt, new ExternalSPSFilePathCollector(externalSps), externalHandler, blkMoveListener); externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL); } - private class ExternalBlockMovementListener implements BlockMovementListener { - - private List<Block> actualBlockMovements = new ArrayList<>(); - - @Override - public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) { - for (Block block : moveAttemptFinishedBlks) { - actualBlockMovements.add(block); - } - LOG.info("Movement attempted blocks", actualBlockMovements); - } - } - private NameNodeConnector getNameNodeConnector(Configuration conf) throws IOException { final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf); @@ -237,16 +218,15 @@ public class TestExternalStoragePolicySatisfier public void waitForBlocksMovementAttemptReport( long expectedMovementFinishedBlocksCount, int timeout) throws TimeoutException, InterruptedException { + Assert.assertNotNull("Didn't set external block move listener", + blkMoveListener); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { + int actualCount = blkMoveListener.getActualBlockMovements().size(); LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", - expectedMovementFinishedBlocksCount, - ((BlockStorageMovementAttemptedItems<String>) (externalSps - .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount()); - return ((BlockStorageMovementAttemptedItems<String>) (externalSps - .getAttemptedItemsMonitor())) - .getMovementFinishedBlocksCount() + expectedMovementFinishedBlocksCount, actualCount); + return actualCount >= expectedMovementFinishedBlocksCount; } }, 100, timeout); @@ -352,6 +332,8 @@ public class TestExternalStoragePolicySatisfier files.add(FILE); DistributedFileSystem fs = getFS(); + // stops sps to make the SPS Q with many outstanding requests. + externalSps.stopGracefully(); // Creates 4 more files. Send all of them for satisfying the storage // policy together. for (int i = 0; i < 3; i++) { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org