This is an automated email from the ASF dual-hosted git repository. nanda pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push: new f8a62d6 HDDS-3810. Add the logic to distribute open containers among the pipelines of a datanode. (#1274) f8a62d6 is described below commit f8a62d6d506974b5ee2df64bdc2ea3aea6ff7610 Author: bshashikant <shashik...@apache.org> AuthorDate: Mon Oct 5 11:02:22 2020 +0530 HDDS-3810. Add the logic to distribute open containers among the pipelines of a datanode. (#1274) --- .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 4 ++ .../common/src/main/resources/ozone-default.xml | 7 ++ .../hdds/scm/container/SCMContainerManager.java | 11 +++- .../apache/hadoop/hdds/scm/node/DatanodeInfo.java | 33 ++++++++++ .../apache/hadoop/hdds/scm/node/NodeManager.java | 6 +- .../hadoop/hdds/scm/node/SCMNodeManager.java | 53 +++++++++++++++- .../hadoop/hdds/scm/pipeline/PipelineManager.java | 4 +- .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 7 +- .../hdds/scm/pipeline/RatisPipelineProvider.java | 6 +- .../hdds/scm/pipeline/SCMPipelineManager.java | 16 ++++- .../hadoop/hdds/scm/block/TestBlockManager.java | 69 ++++++++++++++++++++ .../hadoop/hdds/scm/container/MockNodeManager.java | 28 +++++++- .../hadoop/hdds/scm/node/TestSCMNodeManager.java | 4 +- .../scm/pipeline/TestPipelinePlacementPolicy.java | 4 +- .../hdds/scm/pipeline/TestSCMPipelineManager.java | 74 ++++++++++++++++++++++ .../testutils/ReplicationNodeManagerMock.java | 12 +++- .../hadoop/ozone/fsck/TestContainerMapper.java | 3 + 17 files changed, 319 insertions(+), 22 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 672b440..540d2c0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -313,6 +313,10 @@ public final class ScmConfigKeys { OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY = "ozone.scm.keyvalue.container.deletion-choosing.policy"; + public static final String OZONE_SCM_PIPELINE_PER_METADATA_VOLUME = + "ozone.scm.pipeline.per.metadata.disk"; + + public static final int OZONE_SCM_PIPELINE_PER_METADATA_VOLUME_DEFAULT = 2; // Max timeout for pipeline to stay at ALLOCATED state before scrubbed. public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT = "ozone.scm.pipeline.allocated.timeout"; diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index f16ff3f..70d35a3 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -822,6 +822,13 @@ </description> </property> <property> + <name>ozone.scm.pipeline.per.metadata.disk</name> + <value>2</value> + <tag>OZONE, SCM, PIPELINE</tag> + <description>Number of pipelines to be created per raft log disk. + </description> + </property> + <property> <name>ozone.datanode.pipeline.limit</name> <value>2</value> <tag>OZONE, SCM, PIPELINE</tag> diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java index 5857c4c..fa286a2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java @@ -107,6 +107,14 @@ public class SCMContainerManager implements ContainerManager { scmContainerManagerMetrics = SCMContainerManagerMetrics.create(); } + private int getOpenContainerCountPerPipeline(Pipeline pipeline) { + int minContainerCountPerDn = numContainerPerVolume * + pipelineManager.minHealthyVolumeNum(pipeline); + int minPipelineCountPerDn = pipelineManager.minPipelineLimit(pipeline); + return (int) Math.ceil( + ((double) minContainerCountPerDn / minPipelineCountPerDn)); + } + private void loadExistingContainers() throws IOException { TableIterator<ContainerID, ? extends KeyValue<ContainerID, ContainerInfo>> @@ -440,8 +448,7 @@ public class SCMContainerManager implements ContainerManager { synchronized (pipeline) { containerIDs = getContainersForOwner(pipeline, owner); - if (containerIDs.size() < numContainerPerVolume * pipelineManager. - getNumHealthyVolumes(pipeline)) { + if (containerIDs.size() < getOpenContainerCountPerPipeline(pipeline)) { containerInfo = containerStateManager.allocateContainer( pipelineManager, owner, pipeline); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java index b39440f..2e7bdeb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto; import org.apache.hadoop.util.Time; import java.util.Collections; @@ -40,6 +42,7 @@ public class DatanodeInfo extends DatanodeDetails { private long lastStatsUpdatedTime; private List<StorageReportProto> storageReports; + private List<MetadataStorageReportProto> metadataStorageReports; /** * Constructs DatanodeInfo from DatanodeDetails. @@ -51,6 +54,7 @@ public class DatanodeInfo extends DatanodeDetails { this.lock = new ReentrantReadWriteLock(); this.lastHeartbeatTime = Time.monotonicNow(); this.storageReports = Collections.emptyList(); + this.metadataStorageReports = Collections.emptyList(); } /** @@ -95,6 +99,22 @@ public class DatanodeInfo extends DatanodeDetails { } /** + * Updates the datanode metadata storage reports. + * + * @param reports list of metadata storage report + */ + public void updateMetaDataStorageReports( + List<MetadataStorageReportProto> reports) { + try { + lock.writeLock().lock(); + lastStatsUpdatedTime = Time.monotonicNow(); + metadataStorageReports = reports; + } finally { + lock.writeLock().unlock(); + } + } + + /** * Returns the storage reports associated with this datanode. * * @return list of storage report @@ -122,6 +142,19 @@ public class DatanodeInfo extends DatanodeDetails { } /** + * Returns count of healthy metadata volumes reported from datanode. + * @return count of healthy metdata log volumes + */ + public int getMetaDataVolumeCount() { + try { + lock.readLock().lock(); + return metadataStorageReports.size(); + } finally { + lock.readLock().unlock(); + } + } + + /** * Returns count of failed volumes reported from datanode. * @return count of failed volumes */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index df21b84..4af2357 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -214,5 +214,9 @@ public interface NodeManager extends StorageContainerNodeProtocol, */ NetworkTopology getClusterNetworkTopologyMap(); - int getNumHealthyVolumes(List <DatanodeDetails> dnList); + int minHealthyVolumeNum(List <DatanodeDetails> dnList); + + int pipelineLimit(DatanodeDetails dn); + + int minPipelineLimit(List<DatanodeDetails> dn); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 1a0cec3..328f271 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.VersionInfo; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; @@ -103,6 +104,8 @@ public class SCMNodeManager implements NodeManager { private final boolean useHostname; private final ConcurrentHashMap<String, Set<String>> dnsToUuidMap = new ConcurrentHashMap<>(); + private final int numPipelinesPerMetadataVolume; + private final int heavyNodeCriteria; /** * Constructs SCM machine Manager. @@ -130,6 +133,11 @@ public class SCMNodeManager implements NodeManager { this.useHostname = conf.getBoolean( DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME, DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); + this.numPipelinesPerMetadataVolume = + conf.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME, + ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME_DEFAULT); + String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT); + this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit); } private void registerMXBean() { @@ -363,6 +371,8 @@ public class SCMNodeManager implements NodeManager { DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails); if (nodeReport != null) { datanodeInfo.updateStorageReports(nodeReport.getStorageReportList()); + datanodeInfo.updateMetaDataStorageReports(nodeReport. + getMetadataStorageReportList()); metrics.incNumNodeReportProcessed(); } } catch (NodeNotFoundException e) { @@ -511,11 +521,11 @@ public class SCMNodeManager implements NodeManager { } /** - * Returns the max of no healthy volumes reported out of the set + * Returns the min of no healthy volumes reported out of the set * of datanodes constituting the pipeline. */ @Override - public int getNumHealthyVolumes(List<DatanodeDetails> dnList) { + public int minHealthyVolumeNum(List<DatanodeDetails> dnList) { List<Integer> volumeCountList = new ArrayList<>(dnList.size()); for (DatanodeDetails dn : dnList) { try { @@ -527,7 +537,44 @@ public class SCMNodeManager implements NodeManager { } } Preconditions.checkArgument(!volumeCountList.isEmpty()); - return Collections.max(volumeCountList); + return Collections.min(volumeCountList); + } + + /** + * Returns the pipeline limit for the datanode. + * if the datanode pipeline limit is set, consider that as the max + * pipeline limit. + * In case, the pipeline limit is not set, the max pipeline limit + * will be based on the no of raft log volume reported and provided + * that it has atleast one healthy data volume. + */ + @Override + public int pipelineLimit(DatanodeDetails dn) { + try { + if (heavyNodeCriteria > 0) { + return heavyNodeCriteria; + } else if (nodeStateManager.getNode(dn).getHealthyVolumeCount() > 0) { + return numPipelinesPerMetadataVolume * + nodeStateManager.getNode(dn).getMetaDataVolumeCount(); + } + } catch (NodeNotFoundException e) { + LOG.warn("Cannot generate NodeStat, datanode {} not found.", + dn.getUuid()); + } + return 0; + } + + /** + * Returns the pipeline limit for set of datanodes. + */ + @Override + public int minPipelineLimit(List<DatanodeDetails> dnList) { + List<Integer> pipelineCountList = new ArrayList<>(dnList.size()); + for (DatanodeDetails dn : dnList) { + pipelineCountList.add(pipelineLimit(dn)); + } + Preconditions.checkArgument(!pipelineCountList.isEmpty()); + return Collections.min(pipelineCountList); } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 857f76e..0cb905e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -89,7 +89,9 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean, void incNumBlocksAllocatedMetric(PipelineID id); - int getNumHealthyVolumes(Pipeline pipeline); + int minHealthyVolumeNum(Pipeline pipeline); + + int minPipelineLimit(Pipeline pipeline); /** * Activates a dormant pipeline. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java index 84efdc2..b9441be 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java @@ -75,9 +75,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { this.nodeManager = nodeManager; this.conf = conf; this.stateManager = stateManager; - this.heavyNodeCriteria = conf.getInt( - ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, - ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT); + String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT); + this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit); } int currentPipelineCount(DatanodeDetails datanodeDetails, int nodesRequired) { @@ -149,7 +148,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { .map(d -> new DnWithPipelines(d, currentPipelineCount(d, nodesRequired))) .filter(d -> - ((d.getPipelines() < heavyNodeCriteria) || heavyNodeCriteria == 0)) + (d.getPipelines() < nodeManager.pipelineLimit(d.getDn()))) .sorted(Comparator.comparingInt(DnWithPipelines::getPipelines)) .map(d -> d.getDn()) .collect(Collectors.toList()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 4d91541..e39f141 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -64,9 +64,9 @@ public class RatisPipelineProvider extends PipelineProvider { this.pipelineNumberLimit = conf.getInt( ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT_DEFAULT); - this.maxPipelinePerDatanode = conf.getInt( - ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, - ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT); + String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT); + this.maxPipelinePerDatanode = dnLimit == null ? 0 : + Integer.parseInt(dnLimit); } private boolean exceedPipelineNumberLimit(ReplicationFactor factor) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index f072ebb..db7fcae 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -672,13 +672,23 @@ public class SCMPipelineManager implements PipelineManager { } /** - * returns max count of healthy volumes from the set of + * returns min number of healthy volumes from the set of * datanodes constituting the pipeline. * @param pipeline * @return healthy volume count */ - public int getNumHealthyVolumes(Pipeline pipeline) { - return nodeManager.getNumHealthyVolumes(pipeline.getNodes()); + public int minHealthyVolumeNum(Pipeline pipeline) { + return nodeManager.minHealthyVolumeNum(pipeline.getNodes()); + } + + /** + * returns max count of raft log volumes from the set of + * datanodes constituting the pipeline. + * @param pipeline + * @return healthy volume count + */ + public int minPipelineLimit(Pipeline pipeline) { + return nodeManager.minPipelineLimit(pipeline.getNodes()); } protected ReadWriteLock getLock() { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java index a72031c..6b6e8d8 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java @@ -245,6 +245,7 @@ public class TestBlockManager { public void testBlockDistribution() throws Exception { int threadCount = numContainerPerOwnerInPipeline * numContainerPerOwnerInPipeline; + nodeManager.setNumPipelinePerDatanode(1); List<ExecutorService> executors = new ArrayList<>(threadCount); for (int i = 0; i < threadCount; i++) { executors.add(Executors.newSingleThreadExecutor()); @@ -304,6 +305,7 @@ public class TestBlockManager { int threadCount = numContainerPerOwnerInPipeline * numContainerPerOwnerInPipeline; nodeManager.setNumHealthyVolumes(numContainerPerOwnerInPipeline); + nodeManager.setNumPipelinePerDatanode(1); List<ExecutorService> executors = new ArrayList<>(threadCount); for (int i = 0; i < threadCount; i++) { executors.add(Executors.newSingleThreadExecutor()); @@ -365,6 +367,71 @@ public class TestBlockManager { } @Test + public void testBlockDistributionWithMultipleRaftLogDisks() throws Exception { + int threadCount = numContainerPerOwnerInPipeline * + numContainerPerOwnerInPipeline; + int numMetaDataVolumes = 2; + nodeManager.setNumHealthyVolumes(numContainerPerOwnerInPipeline); + nodeManager.setNumMetaDataVolumes(numMetaDataVolumes); + List<ExecutorService> executors = new ArrayList<>(threadCount); + for (int i = 0; i < threadCount; i++) { + executors.add(Executors.newSingleThreadExecutor()); + } + pipelineManager.createPipeline(type, factor); + TestUtils.openAllRatisPipelines(pipelineManager); + Map<Long, List<AllocatedBlock>> allocatedBlockMap = + new ConcurrentHashMap<>(); + List<CompletableFuture<AllocatedBlock>> futureList = + new ArrayList<>(threadCount); + for (int i = 0; i < threadCount; i++) { + final CompletableFuture<AllocatedBlock> future = + new CompletableFuture<>(); + CompletableFuture.supplyAsync(() -> { + try { + List<AllocatedBlock> blockList; + AllocatedBlock block = blockManager + .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, + OzoneConsts.OZONE, + new ExcludeList()); + long containerId = block.getBlockID().getContainerID(); + if (!allocatedBlockMap.containsKey(containerId)) { + blockList = new ArrayList<>(); + } else { + blockList = allocatedBlockMap.get(containerId); + } + blockList.add(block); + allocatedBlockMap.put(containerId, blockList); + future.complete(block); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + }, executors.get(i)); + futureList.add(future); + } + try { + CompletableFuture + .allOf(futureList.toArray( + new CompletableFuture[futureList.size()])).get(); + Assert.assertTrue( + pipelineManager.getPipelines(type).size() == 1); + Pipeline pipeline = pipelineManager.getPipelines(type).get(0); + // the pipeline per raft log disk config is set to 1 by default + int numContainers = (int)Math.ceil((double) + (numContainerPerOwnerInPipeline * + numContainerPerOwnerInPipeline)/numMetaDataVolumes); + Assert.assertTrue(numContainers == pipelineManager. + getNumberOfContainers(pipeline.getId())); + Assert.assertTrue( + allocatedBlockMap.size() == numContainers); + Assert.assertTrue(allocatedBlockMap. + values().size() == numContainers); + } catch (Exception e) { + Assert.fail("testAllocateBlockInParallel failed"); + } + } + + @Test public void testAllocateOversizedBlock() throws Exception { long size = 6 * GB; thrown.expectMessage("Unsupported block size"); @@ -434,6 +501,8 @@ public class TestBlockManager { @Test(timeout = 10000) public void testMultipleBlockAllocationWithClosedContainer() throws IOException, TimeoutException, InterruptedException { + nodeManager.setNumPipelinePerDatanode(1); + nodeManager.setNumHealthyVolumes(1); // create pipelines for (int i = 0; i < nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size() / factor diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 4b8b37d..7aca0f3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -68,6 +68,7 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; * Test Helper for testing container Mapping. */ public class MockNodeManager implements NodeManager { + public final static int NUM_PIPELINE_PER_METADATA_DISK = 2; private final static NodeData[] NODES = { new NodeData(10L * OzoneConsts.TB, OzoneConsts.GB), new NodeData(64L * OzoneConsts.TB, 100 * OzoneConsts.GB), @@ -93,6 +94,8 @@ public class MockNodeManager implements NodeManager { private NetworkTopology clusterMap; private ConcurrentMap<String, Set<String>> dnsToUuidMap; private int numHealthyDisksPerDatanode; + private int numRaftLogDisksPerDatanode; + private int numPipelinePerDatanode; public MockNodeManager(NetworkTopologyImpl clusterMap, List<DatanodeDetails> nodes, @@ -123,6 +126,9 @@ public class MockNodeManager implements NodeManager { safemode = false; this.commandMap = new HashMap<>(); numHealthyDisksPerDatanode = 1; + numRaftLogDisksPerDatanode = 1; + numPipelinePerDatanode = numRaftLogDisksPerDatanode * + NUM_PIPELINE_PER_METADATA_DISK; } public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { @@ -585,14 +591,34 @@ public class MockNodeManager implements NodeManager { } @Override - public int getNumHealthyVolumes(List<DatanodeDetails> dnList) { + public int minHealthyVolumeNum(List<DatanodeDetails> dnList) { return numHealthyDisksPerDatanode; } + @Override + public int pipelineLimit(DatanodeDetails dn) { + // by default 1 single node pipeline and 1 three node pipeline + return numPipelinePerDatanode; + } + + @Override + public int minPipelineLimit(List<DatanodeDetails> dn) { + // by default 1 single node pipeline and 1 three node pipeline + return numPipelinePerDatanode; + } + + public void setNumPipelinePerDatanode(int value) { + numPipelinePerDatanode = value; + } + public void setNumHealthyVolumes(int value) { numHealthyDisksPerDatanode = value; } + public void setNumMetaDataVolumes(int value) { + numRaftLogDisksPerDatanode = value; + } + /** * A class to declare some values for the nodes so that our tests * won't fail. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index 7a58d46..3f3c4ae 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -872,7 +872,7 @@ public class TestSCMNodeManager { .getScmUsed().get()); assertEquals(remaining * nodeCount, (long) nodeManager.getStats() .getRemaining().get()); - assertEquals(1, nodeManager.getNumHealthyVolumes(dnList)); + assertEquals(1, nodeManager.minHealthyVolumeNum(dnList)); dnList.clear(); } } @@ -917,7 +917,7 @@ public class TestSCMNodeManager { assertEquals(1, nodeManager.getNodeCount(HEALTHY)); assertEquals(volumeCount / 2, - nodeManager.getNumHealthyVolumes(dnList)); + nodeManager.minHealthyVolumeNum(dnList)); dnList.clear(); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java index 1274608..f024fc5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java @@ -86,6 +86,7 @@ public class TestPipelinePlacementPolicy { false, PIPELINE_PLACEMENT_MAX_NODES_COUNT); conf = new OzoneConfiguration(); conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, PIPELINE_LOAD_LIMIT); + nodeManager.setNumPipelinePerDatanode(PIPELINE_LOAD_LIMIT); stateManager = new PipelineStateManager(); placementPolicy = new PipelinePlacementPolicy( nodeManager, stateManager, conf); @@ -193,7 +194,8 @@ public class TestPipelinePlacementPolicy { nodeManager.addPipeline(pipeline); stateManager.addPipeline(pipeline); } catch (SCMException e) { - break; + throw e; + //break; } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 25957d8..d14e468 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -70,6 +70,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.mockito.InOrder; + import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -275,6 +276,7 @@ public class TestSCMPipelineManager { new SCMPipelineManager(conf, nodeManagerMock, scmMetadataStore.getPipelineTable(), new EventQueue()); pipelineManager.allowPipelineCreation(); + nodeManagerMock.setNumPipelinePerDatanode(1); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManagerMock, pipelineManager.getStateManager(), conf); @@ -336,6 +338,78 @@ public class TestSCMPipelineManager { } @Test + public void testPipelineLimit() throws Exception { + int numMetaDataVolumes = 2; + final OzoneConfiguration config = new OzoneConfiguration(); + config.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); + config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, + false); + // turning off this config will ensure, pipeline creation is determined by + // metadata volume count. + config.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 0); + MockNodeManager nodeManagerMock = new MockNodeManager(true, + 3); + nodeManagerMock.setNumMetaDataVolumes(numMetaDataVolumes); + int pipelinePerDn = numMetaDataVolumes * + MockNodeManager.NUM_PIPELINE_PER_METADATA_DISK; + nodeManagerMock.setNumPipelinePerDatanode(pipelinePerDn); + SCMPipelineManager pipelineManager = + new SCMPipelineManager(config, nodeManagerMock, + scmMetadataStore.getPipelineTable(), new EventQueue()); + pipelineManager.allowPipelineCreation(); + PipelineProvider mockRatisProvider = + new MockRatisPipelineProvider(nodeManagerMock, + pipelineManager.getStateManager(), config); + pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, + mockRatisProvider); + + MetricsRecordBuilder metrics = getMetrics( + SCMPipelineMetrics.class.getSimpleName()); + long numPipelineAllocated = getLongCounter("NumPipelineAllocated", + metrics); + Assert.assertEquals(0, numPipelineAllocated); + + // max limit on no of pipelines is 4 + for (int i = 0; i < pipelinePerDn; i++) { + Pipeline pipeline = pipelineManager + .createPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + Assert.assertNotNull(pipeline); + } + + metrics = getMetrics( + SCMPipelineMetrics.class.getSimpleName()); + numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics); + Assert.assertEquals(4, numPipelineAllocated); + + long numPipelineCreateFailed = getLongCounter( + "NumPipelineCreationFailed", metrics); + Assert.assertEquals(0, numPipelineCreateFailed); + //This should fail... + try { + pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + fail(); + } catch (SCMException ioe) { + // pipeline creation failed this time. + Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE, + ioe.getResult()); + } + + metrics = getMetrics( + SCMPipelineMetrics.class.getSimpleName()); + numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics); + Assert.assertEquals(4, numPipelineAllocated); + + numPipelineCreateFailed = getLongCounter( + "NumPipelineCreationFailed", metrics); + Assert.assertEquals(1, numPipelineCreateFailed); + + // clean up + pipelineManager.close(); + } + + @Test public void testActivateDeactivatePipeline() throws IOException { final SCMPipelineManager pipelineManager = new SCMPipelineManager(conf, nodeManager, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index a9b879f..6d088fe 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -345,7 +345,17 @@ public class ReplicationNodeManagerMock implements NodeManager { } @Override - public int getNumHealthyVolumes(List<DatanodeDetails> dnList) { + public int minHealthyVolumeNum(List<DatanodeDetails> dnList) { + return 0; + } + + @Override + public int pipelineLimit(DatanodeDetails dn) { + return 0; + } + + @Override + public int minPipelineLimit(List<DatanodeDetails> dn) { return 0; } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java index 4fa5ea4..2d1acf7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java @@ -72,6 +72,9 @@ public class TestContainerMapper { dbPath = GenericTestUtils.getRandomizedTempPath(); conf.set(OZONE_OM_DB_DIRS, dbPath); conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, "100MB"); + // By default, 2 pipelines are created. Setting the value to 6, will ensure + // each pipleine can have 3 containers open. + conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 6); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3) .setScmId(SCM_ID) --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org