This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 73e6f90aae HDDS-9345. Add CapacityPipelineChoosePolicy considering
datanode storage space (#5354)
73e6f90aae is described below
commit 73e6f90aae4dc22a265361b9a63ccb25a5e6919a
Author: Hongbing Wang <[email protected]>
AuthorDate: Tue Jan 23 01:11:32 2024 +0800
HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage
space (#5354)
---
.../java/org/apache/hadoop/hdds/scm/ScmConfig.java | 32 +++--
.../hadoop/hdds/scm/PipelineChoosePolicy.java | 10 ++
.../container/placement/metrics/SCMNodeMetric.java | 16 ++-
.../container/placement/metrics/SCMNodeStat.java | 9 ++
.../algorithms/CapacityPipelineChoosePolicy.java | 136 +++++++++++++++++++++
.../algorithms/PipelineChoosePolicyFactory.java | 10 +-
.../hdds/scm/server/StorageContainerManager.java | 4 +-
.../pipeline/TestWritableECContainerProvider.java | 24 +++-
.../TestCapacityPipelineChoosePolicy.java | 107 ++++++++++++++++
.../TestPipelineChoosePolicyFactory.java | 19 +--
10 files changed, 340 insertions(+), 27 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
index 46816a63d3..2fc04e00f2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
@@ -73,8 +73,17 @@ public class ScmConfig extends ReconfigurableConfig {
+ "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
+ "The class decides which pipeline will be used to find or "
+ "allocate Ratis containers. If not set, "
- + "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms. "
- + "RandomPipelineChoosePolicy will be used as default value."
+ + "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+ + "RandomPipelineChoosePolicy will be used as default value. "
+ + "The following values can be used, "
+ + "(1) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+ + "RandomPipelineChoosePolicy : random choose one pipeline. "
+ + "(2) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+ + "HealthyPipelineChoosePolicy : random choose one healthy pipeline.
"
+ + "(3) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+ + "CapacityPipelineChoosePolicy : choose the pipeline with lower "
+ + "utilization from the two pipelines. Note that random choose "
+ + "method will be executed twice in this policy."
)
private String pipelineChoosePolicyName;
@@ -85,11 +94,20 @@ public class ScmConfig extends ReconfigurableConfig {
tags = { ConfigTag.SCM, ConfigTag.PIPELINE },
description =
"The full name of class which implements "
- + "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
- + "The class decides which pipeline will be used when "
- + "selecting an EC Pipeline. If not set, "
- + "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms. "
- + "RandomPipelineChoosePolicy will be used as default value."
+ + "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
+ + "The class decides which pipeline will be used when "
+ + "selecting an EC Pipeline. If not set, "
+ + "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+ + "RandomPipelineChoosePolicy will be used as default value. "
+ + "The following values can be used, "
+ + "(1) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+ + "RandomPipelineChoosePolicy : random choose one pipeline. "
+ + "(2) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+ + "HealthyPipelineChoosePolicy : random choose one healthy pipeline.
"
+ + "(3) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+ + "CapacityPipelineChoosePolicy : choose the pipeline with lower "
+ + "utilization from the two pipelines. Note that random choose "
+ + "method will be executed twice in this policy."
)
private String ecPipelineChoosePolicyName;
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
similarity index 86%
rename from
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
rename to
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
index 76439a7846..e1d0fdd35a 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import java.util.List;
@@ -26,6 +27,15 @@ import java.util.List;
*/
public interface PipelineChoosePolicy {
+ /**
+ * Updates the policy with NodeManager.
+ * @return updated policy.
+ */
+ default PipelineChoosePolicy init(final NodeManager nodeManager) {
+ // override if the policy requires nodeManager
+ return this;
+ }
+
/**
* Given an initial list of pipelines, return one of the pipelines.
*
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java
index 330bf67416..094e535dcb 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java
@@ -23,7 +23,8 @@ import com.google.common.base.Preconditions;
/**
* SCM Node Metric that is used in the placement classes.
*/
-public class SCMNodeMetric implements DatanodeMetric<SCMNodeStat, Long> {
+public class SCMNodeMetric implements DatanodeMetric<SCMNodeStat, Long>,
+ Comparable<SCMNodeMetric> {
private SCMNodeStat stat;
/**
@@ -195,12 +196,12 @@ public class SCMNodeMetric implements
DatanodeMetric<SCMNodeStat, Long> {
* @throws ClassCastException if the specified object's type prevents it
* from being compared to this object.
*/
- //@Override
- public int compareTo(SCMNodeStat o) {
- if (isEqual(o)) {
+ @Override
+ public int compareTo(SCMNodeMetric o) {
+ if (isEqual(o.get())) {
return 0;
}
- if (isGreater(o)) {
+ if (isGreater(o.get())) {
return 1;
} else {
return -1;
@@ -225,4 +226,9 @@ public class SCMNodeMetric implements
DatanodeMetric<SCMNodeStat, Long> {
public int hashCode() {
return stat != null ? stat.hashCode() : 0;
}
+
+ @Override
+ public String toString() {
+ return "SCMNodeMetric{" + stat.toString() + '}';
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
index 2a848a04ef..5456e6ee52 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
@@ -174,4 +174,13 @@ public class SCMNodeStat implements NodeStat {
return Long.hashCode(capacity.get() ^ scmUsed.get() ^ remaining.get() ^
committed.get() ^ freeSpaceToSpare.get());
}
+
+ @Override
+ public String toString() {
+ return "SCMNodeStat{" +
+ "capacity=" + capacity.get() +
+ ", scmUsed=" + scmUsed.get() +
+ ", remaining=" + remaining.get() +
+ '}';
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java
new file mode 100644
index 0000000000..a95a473de6
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Pipeline choose policy that randomly choose pipeline with relatively
+ * lower utilization.
+ * <p>
+ * The Algorithm is as follows, Pick 2 random pipelines from a given pool of
+ * pipelines and then pick the pipeline which has lower utilization.
+ * This leads to a higher probability of pipelines with lower utilization
+ * to be picked.
+ * <p>
+ * For those wondering why we choose two pipelines randomly and choose the
+ * pipeline with lower utilization. There are links to this original papers in
+ * HDFS-11564.
+ * Also, the same algorithm applies to SCMContainerPlacementCapacity.
+ * <p>
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PipelineChoosePolicy.class);
+
+ private NodeManager nodeManager;
+
+ private final PipelineChoosePolicy healthPolicy;
+
+ public CapacityPipelineChoosePolicy() {
+ healthPolicy = new HealthyPipelineChoosePolicy();
+ }
+
+ @Override
+ public PipelineChoosePolicy init(final NodeManager scmNodeManager) {
+ this.nodeManager = scmNodeManager;
+ return this;
+ }
+
+ @Override
+ public Pipeline choosePipeline(List<Pipeline> pipelineList,
+ PipelineRequestInformation pri) {
+ Pipeline pipeline1 = healthPolicy.choosePipeline(pipelineList, pri);
+ Pipeline pipeline2 = healthPolicy.choosePipeline(pipelineList, pri);
+
+ int result = new CapacityPipelineComparator(this)
+ .compare(pipeline1, pipeline2);
+
+ LOG.debug("Chosen the {} pipeline", result <= 0 ? "first" : "second");
+ return result <= 0 ? pipeline1 : pipeline2;
+ }
+
+ @Override
+ public int choosePipelineIndex(List<Pipeline> pipelineList,
+ PipelineRequestInformation pri) {
+ List<Pipeline> mutableList = new ArrayList<>(pipelineList);
+ Pipeline pipeline = choosePipeline(mutableList, pri);
+ return pipelineList.indexOf(pipeline);
+ }
+
+ /**
+ * Return a list of SCMNodeMetrics corresponding to the DataNodes in the
+ * pipeline, sorted in descending order based on scm used storage.
+ * @param pipeline pipeline
+ * @return sorted SCMNodeMetrics corresponding the pipeline
+ */
+ private Deque<SCMNodeMetric> getSortedNodeFromPipeline(Pipeline pipeline) {
+ Deque<SCMNodeMetric> sortedNodeStack = new ArrayDeque<>();
+ pipeline.getNodes().stream()
+ .map(nodeManager::getNodeStat)
+ .filter(Objects::nonNull)
+ .sorted()
+ .forEach(sortedNodeStack::push);
+ return sortedNodeStack;
+ }
+
+ static class CapacityPipelineComparator implements Comparator<Pipeline> {
+ private final CapacityPipelineChoosePolicy policy;
+
+ CapacityPipelineComparator(CapacityPipelineChoosePolicy policy) {
+ this.policy = policy;
+ }
+ @Override
+ public int compare(Pipeline p1, Pipeline p2) {
+ if (p1.getId().equals(p2.getId())) {
+ LOG.debug("Compare the same pipeline {}", p1);
+ return 0;
+ }
+ Deque<SCMNodeMetric> sortedNodes1 = policy.getSortedNodeFromPipeline(p1);
+ Deque<SCMNodeMetric> sortedNodes2 = policy.getSortedNodeFromPipeline(p2);
+
+ // Compare the scmUsed weight of the node in the two sorted node stacks
+ LOG.debug("Compare scmUsed weight in pipelines, first : {}, second : {}",
+ sortedNodes1, sortedNodes2);
+ int result = 0;
+ int count = 0;
+ while (result == 0 &&
+ !sortedNodes1.isEmpty() && !sortedNodes2.isEmpty()) {
+ count++;
+ LOG.debug("Compare {} round", count);
+ result = sortedNodes1.pop().compareTo(sortedNodes2.pop());
+ }
+ return result;
+ }
+ }
+
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/PipelineChoosePolicyFactory.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/PipelineChoosePolicyFactory.java
index d040dbe2bc..90736a0181 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/PipelineChoosePolicyFactory.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/PipelineChoosePolicyFactory.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,14 +49,14 @@ public final class PipelineChoosePolicyFactory {
private PipelineChoosePolicyFactory() {
}
- public static PipelineChoosePolicy getPolicy(
+ public static PipelineChoosePolicy getPolicy(final NodeManager nodeManager,
ScmConfig scmConfig, boolean forEC) throws SCMException {
Class<? extends PipelineChoosePolicy> policyClass = null;
String policyName = forEC ? scmConfig.getECPipelineChoosePolicyName() :
scmConfig.getPipelineChoosePolicyName();
try {
policyClass = getClass(policyName, PipelineChoosePolicy.class);
- return createPipelineChoosePolicyFromClass(policyClass);
+ return createPipelineChoosePolicyFromClass(nodeManager, policyClass);
} catch (Exception e) {
Class<? extends PipelineChoosePolicy> defaultPolicy = forEC ?
OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT :
@@ -64,13 +65,14 @@ public final class PipelineChoosePolicyFactory {
LOG.error("Met an exception while create pipeline choose policy "
+ "for the given class {}. Fallback to the default pipeline "
+ " choose policy {}", policyName, defaultPolicy, e);
- return createPipelineChoosePolicyFromClass(defaultPolicy);
+ return createPipelineChoosePolicyFromClass(nodeManager, defaultPolicy);
}
throw e;
}
}
private static PipelineChoosePolicy createPipelineChoosePolicyFromClass(
+ final NodeManager nodeManager,
Class<? extends PipelineChoosePolicy> policyClass) throws SCMException {
Constructor<? extends PipelineChoosePolicy> constructor;
try {
@@ -86,7 +88,7 @@ public final class PipelineChoosePolicyFactory {
}
try {
- return constructor.newInstance();
+ return constructor.newInstance().init(nodeManager);
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate class " +
policyClass.getCanonicalName() + " for " + e.getMessage());
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 1a3ea2515f..046be68760 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -804,9 +804,9 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
ScmConfig scmConfig = conf.getObject(ScmConfig.class);
pipelineChoosePolicy = PipelineChoosePolicyFactory
- .getPolicy(scmConfig, false);
+ .getPolicy(scmNodeManager, scmConfig, false);
ecPipelineChoosePolicy = PipelineChoosePolicyFactory
- .getPolicy(scmConfig, true);
+ .getPolicy(scmNodeManager, scmConfig, true);
if (configurator.getWritableContainerFactory() != null) {
writableContainerFactory = configurator.getWritableContainerFactory();
} else {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
index 54d2ffed82..4f86450d03 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
@@ -34,7 +34,11 @@ import
org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
+import org.apache.hadoop.hdds.scm.net.NodeSchema;
+import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
import
org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig;
+import
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.CapacityPipelineChoosePolicy;
import
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
import
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy;
import org.apache.hadoop.hdds.utils.db.DBStore;
@@ -54,8 +58,13 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
import static
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -84,7 +93,7 @@ public class TestWritableECContainerProvider {
private OzoneConfiguration conf;
private DBStore dbStore;
private SCMHAManager scmhaManager;
- private MockNodeManager nodeManager;
+ private static MockNodeManager nodeManager;
private WritableContainerProvider<ECReplicationConfig> provider;
private ECReplicationConfig repConfig;
@@ -93,8 +102,20 @@ public class TestWritableECContainerProvider {
public static Collection<PipelineChoosePolicy> policies() {
Collection<PipelineChoosePolicy> policies = new ArrayList<>();
+ // init nodeManager
+ NodeSchemaManager.getInstance().init(new NodeSchema[]
+ {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA}, true);
+ NetworkTopologyImpl cluster =
+ new NetworkTopologyImpl(NodeSchemaManager.getInstance());
+ int count = 10;
+ List<DatanodeDetails> datanodes = IntStream.range(0, count)
+ .mapToObj(i -> MockDatanodeDetails.randomDatanodeDetails())
+ .collect(Collectors.toList());
+ nodeManager = new MockNodeManager(cluster, datanodes, false, count);
+
policies.add(new RandomPipelineChoosePolicy());
policies.add(new HealthyPipelineChoosePolicy());
+ policies.add(new CapacityPipelineChoosePolicy().init(nodeManager));
return policies;
}
@@ -110,7 +131,6 @@ public class TestWritableECContainerProvider {
dbStore = DBStoreBuilder.createDBStore(
conf, new SCMDBDefinition());
scmhaManager = SCMHAManagerStub.getInstance(true);
- nodeManager = new MockNodeManager(true, 10);
pipelineManager =
new MockPipelineManager(dbStore, scmhaManager, nodeManager);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestCapacityPipelineChoosePolicy.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestCapacityPipelineChoosePolicy.java
new file mode 100644
index 0000000000..421d2396bf
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestCapacityPipelineChoosePolicy.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for the capacity pipeline choose policy.
+ */
+public class TestCapacityPipelineChoosePolicy {
+
+ @Test
+ public void testChoosePipeline() throws Exception {
+
+ // given 4 datanode
+ List<DatanodeDetails> datanodes = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ datanodes.add(MockDatanodeDetails.randomDatanodeDetails());
+ }
+ // dn0 dn1 dn2 dn3
+ // used 0 10 20 30
+ NodeManager mockNodeManager = mock(NodeManager.class);
+ when(mockNodeManager.getNodeStat(datanodes.get(0)))
+ .thenReturn(new SCMNodeMetric(100L, 0, 100L, 0, 0));
+ when(mockNodeManager.getNodeStat(datanodes.get(1)))
+ .thenReturn(new SCMNodeMetric(100L, 10L, 90L, 0, 0));
+ when(mockNodeManager.getNodeStat(datanodes.get(2)))
+ .thenReturn(new SCMNodeMetric(100L, 20L, 80L, 0, 0));
+ when(mockNodeManager.getNodeStat(datanodes.get(3)))
+ .thenReturn(new SCMNodeMetric(100L, 30L, 70L, 0, 0));
+
+ PipelineChoosePolicy policy = new
CapacityPipelineChoosePolicy().init(mockNodeManager);
+
+ // generate 4 pipelines, and every pipeline has 3 datanodes
+ //
+ // pipeline0 dn1 dn2 dn3
+ // pipeline1 dn0 dn2 dn3
+ // pipeline2 dn0 dn1 dn3
+ // pipeline3 dn0 dn1 dn2
+ //
+ // In the above scenario, pipeline0 vs pipeline1 runs through three rounds
+ // of comparisons, (dn3 <-> dn3) -> (dn2 <-> dn2 ) -> (dn1 <-> dn0),
+ // finally comparing dn0 and dn1, and dn0 wins, so pipeline1 is selected.
+ //
+ List<Pipeline> pipelines = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ List<DatanodeDetails> dns = new ArrayList<>();
+ for (int j = 0; j < datanodes.size(); j++) {
+ if (i != j) {
+ dns.add(datanodes.get(j));
+ }
+ }
+ Pipeline pipeline = MockPipeline.createPipeline(dns);
+ MockRatisPipelineProvider.markPipelineHealthy(pipeline);
+ pipelines.add(pipeline);
+ }
+
+ Map<Pipeline, Integer> selectedCount = new HashMap<>();
+ for (Pipeline pipeline : pipelines) {
+ selectedCount.put(pipeline, 0);
+ }
+ for (int i = 0; i < 1000; i++) {
+ // choosePipeline
+ Pipeline pipeline = policy.choosePipeline(pipelines, null);
+ assertNotNull(pipeline);
+ selectedCount.put(pipeline, selectedCount.get(pipeline) + 1);
+ }
+
+ // The selected count from most to least should be :
+ // pipeline3 > pipeline2 > pipeline1 > pipeline0
+
assertThat(selectedCount.get(pipelines.get(3))).isGreaterThan(selectedCount.get(pipelines.get(2)));
+
assertThat(selectedCount.get(pipelines.get(2))).isGreaterThan(selectedCount.get(pipelines.get(1)));
+
assertThat(selectedCount.get(pipelines.get(1))).isGreaterThan(selectedCount.get(pipelines.get(0)));
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java
index 7d0a72ed2f..82fed5953a 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java
@@ -21,7 +21,9 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -42,17 +44,20 @@ public class TestPipelineChoosePolicyFactory {
private ScmConfig scmConfig;
+ private NodeManager nodeManager;
+
@BeforeEach
public void setup() {
//initialize network topology instance
conf = new OzoneConfiguration();
scmConfig = conf.getObject(ScmConfig.class);
+ nodeManager = new MockNodeManager(true, 5);
}
@Test
public void testDefaultPolicy() throws IOException {
PipelineChoosePolicy policy = PipelineChoosePolicyFactory
- .getPolicy(scmConfig, false);
+ .getPolicy(nodeManager, scmConfig, false);
assertSame(OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
policy.getClass());
}
@@ -60,7 +65,7 @@ public class TestPipelineChoosePolicyFactory {
@Test
public void testDefaultPolicyEC() throws IOException {
PipelineChoosePolicy policy = PipelineChoosePolicyFactory
- .getPolicy(scmConfig, true);
+ .getPolicy(nodeManager, scmConfig, true);
assertSame(OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
policy.getClass());
}
@@ -69,7 +74,7 @@ public class TestPipelineChoosePolicyFactory {
public void testNonDefaultPolicyEC() throws IOException {
scmConfig.setECPipelineChoosePolicyName(DummyGoodImpl.class.getName());
PipelineChoosePolicy policy = PipelineChoosePolicyFactory
- .getPolicy(scmConfig, true);
+ .getPolicy(nodeManager, scmConfig, true);
assertSame(DummyGoodImpl.class, policy.getClass());
}
@@ -121,10 +126,10 @@ public class TestPipelineChoosePolicyFactory {
scmConfig.setPipelineChoosePolicyName(DummyImpl.class.getName());
scmConfig.setECPipelineChoosePolicyName(DummyImpl.class.getName());
PipelineChoosePolicy policy =
- PipelineChoosePolicyFactory.getPolicy(scmConfig, false);
+ PipelineChoosePolicyFactory.getPolicy(nodeManager, scmConfig, false);
assertSame(OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
policy.getClass());
- policy = PipelineChoosePolicyFactory.getPolicy(scmConfig, true);
+ policy = PipelineChoosePolicyFactory.getPolicy(nodeManager, scmConfig,
true);
assertSame(OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
policy.getClass());
}
@@ -137,10 +142,10 @@ public class TestPipelineChoosePolicyFactory {
scmConfig.setECPipelineChoosePolicyName(
"org.apache.hadoop.hdds.scm.pipeline.choose.policy.HelloWorld");
PipelineChoosePolicy policy =
- PipelineChoosePolicyFactory.getPolicy(scmConfig, false);
+ PipelineChoosePolicyFactory.getPolicy(nodeManager, scmConfig, false);
assertSame(OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
policy.getClass());
- policy = PipelineChoosePolicyFactory.getPolicy(scmConfig, true);
+ policy = PipelineChoosePolicyFactory.getPolicy(nodeManager, scmConfig,
true);
assertSame(OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
policy.getClass());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]