This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new b3a2a3e  HDDS-5441. Disallow same set of DNs to be part of multiple 
pipelines.  (#2416)
b3a2a3e is described below

commit b3a2a3eb856c30436b835e18bf6d5ceb88a64771
Author: bshashikant <[email protected]>
AuthorDate: Wed Aug 4 13:28:36 2021 +0530

    HDDS-5441. Disallow same set of DNs to be part of multiple pipelines.  
(#2416)
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   5 +
 .../common/src/main/resources/ozone-default.xml    |   8 +
 .../hadoop/hdds/scm/SCMCommonPlacementPolicy.java  |  14 ++
 .../java/org/apache/hadoop/hdds/scm/ScmUtils.java  |  10 ++
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |   9 ++
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  27 ++++
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |   6 +
 .../hdds/scm/pipeline/TestMultiRaftSetup.java      | 173 +++++++++++++++++++++
 8 files changed, 252 insertions(+)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 8d0aab3..0394591 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -356,6 +356,11 @@ public final class ScmConfigKeys {
           "ozone.scm.datanode.pipeline.limit";
   public static final int OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT = 2;
 
+  public static final String OZONE_SCM_DATANODE_DISALLOW_SAME_PEERS =
+      "ozone.scm.datanode.disallow.same.peers";
+  public static final boolean OZONE_SCM_DATANODE_DISALLOW_SAME_PEERS_DEFAULT =
+      false;
+
   // Upper limit for how many pipelines can be created
   // across the cluster nodes managed by SCM.
   // Only for test purpose now.
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 9abfc1a..5e480ae 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -844,6 +844,14 @@
   </description>
   </property>
   <property>
+    <name>ozone.scm.datanode.disallow.same.peers</name>
+    <value>false</value>
+    <tag>OZONE, SCM, PIPELINE</tag>
+    <description>Disallows same set of datanodes to participate in multiple
+      pipelines when set to true. Default is set to false.
+    </description>
+  </property>
+  <property>
     <name>ozone.scm.ratis.pipeline.limit</name>
     <value>0</value>
     <tag>OZONE, SCM, PIPELINE</tag>
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
index bec0db3..415a8e4 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.hdds.scm;
 
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -50,6 +51,7 @@ public abstract class SCMCommonPlacementPolicy implements 
PlacementPolicy {
   private final NodeManager nodeManager;
   private final Random rand;
   private final ConfigurationSource conf;
+  private final boolean shouldRemovePeers;
 
   /**
    * Return for replication factor 1 containers where the placement policy
@@ -73,6 +75,7 @@ public abstract class SCMCommonPlacementPolicy implements 
PlacementPolicy {
     this.nodeManager = nodeManager;
     this.rand = new Random();
     this.conf = conf;
+    this.shouldRemovePeers = ScmUtils.shouldRemovePeers(conf);
   }
 
   /**
@@ -236,6 +239,7 @@ public abstract class SCMCommonPlacementPolicy implements 
PlacementPolicy {
       // invoke the choose function defined in the derived classes.
       DatanodeDetails nodeId = chooseNode(healthyNodes);
       if (nodeId != null) {
+        removePeers(nodeId, healthyNodes);
         results.add(nodeId);
       }
     }
@@ -314,4 +318,14 @@ public abstract class SCMCommonPlacementPolicy implements 
PlacementPolicy {
     return new ContainerPlacementStatusDefault(
         (int)currentRackCount, requiredRacks, numRacks);
   }
+
+  /**
+   * Removes the datanode peers from all the existing pipelines for this dn.
+   */
+  public void removePeers(DatanodeDetails dn,
+      List<DatanodeDetails> healthyList) {
+    if (shouldRemovePeers) {
+      healthyList.removeAll(nodeManager.getPeerList(dn));
+    }
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
index fa4bfcb..4ab9ec2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
@@ -181,4 +181,14 @@ public final class ScmUtils {
             "nodeId. If want to configure same port configure {}", confKey,
         portKey, portKey);
   }
+
+  public static boolean shouldRemovePeers(final ConfigurationSource conf) {
+    int pipelineLimitPerDn =
+        conf.getInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
+            ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
+    return (1 != pipelineLimitPerDn && conf
+        .getBoolean(ScmConfigKeys.OZONE_SCM_DATANODE_DISALLOW_SAME_PEERS,
+            ScmConfigKeys.OZONE_SCM_DATANODE_DISALLOW_SAME_PEERS_DEFAULT));
+  }
+
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index a934d03..43008cd 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -44,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.Collection;
 
 /**
  * A node manager supports a simple interface for managing a datanode.
@@ -307,6 +308,14 @@ public interface NodeManager extends 
StorageContainerNodeProtocol,
 
   int minPipelineLimit(List<DatanodeDetails> dn);
 
+  /**
+   * Gets the peers in all the pipelines for the particular datnode.
+   * @param dn datanode
+   */
+  default Collection<DatanodeDetails> getPeerList(DatanodeDetails dn) {
+    return null;
+  }
+
   default HDDSLayoutVersionManager getLayoutVersionManager(){
     return null;
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index c045d07..c3db8d7 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -31,6 +31,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.Collections;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
 import java.util.stream.Collectors;
@@ -64,6 +66,8 @@ import 
org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
@@ -885,6 +889,29 @@ public class SCMNodeManager implements NodeManager {
     return Collections.min(pipelineCountList);
   }
 
+  @Override
+  public Collection<DatanodeDetails> getPeerList(DatanodeDetails dn) {
+    HashSet<DatanodeDetails> dns = new HashSet<>();
+    Preconditions.checkNotNull(dn);
+    Set<PipelineID> pipelines =
+        nodeStateManager.getPipelineByDnID(dn.getUuid());
+    PipelineManager pipelineManager = scmContext.getScm().getPipelineManager();
+    if (!pipelines.isEmpty()) {
+      pipelines.forEach(id -> {
+        try {
+          Pipeline pipeline = pipelineManager.getPipeline(id);
+          List<DatanodeDetails> peers = pipeline.getNodes();
+          dns.addAll(peers);
+        } catch (PipelineNotFoundException pnfe) {
+          //ignore the pipeline not found exception here
+        }
+      });
+    }
+    // renove self node from the set
+    dns.remove(dn);
+    return dns;
+  }
+
   /**
    * Get set of pipelines a datanode is part of.
    *
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index d330df6..942d04c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -290,6 +290,7 @@ public final class PipelinePlacementPolicy extends 
SCMCommonPlacementPolicy {
     DatanodeDetails anchor = chooseNode(healthyNodes);
     if (anchor != null) {
       results.add(anchor);
+      removePeers(anchor, healthyNodes);
       exclude.add(anchor);
     } else {
       LOG.warn("Unable to find healthy node for anchor(first) node.");
@@ -309,6 +310,7 @@ public final class PipelinePlacementPolicy extends 
SCMCommonPlacementPolicy {
       // Rack awareness is detected.
       rackAwareness = true;
       results.add(nextNode);
+      removePeers(nextNode, healthyNodes);
       exclude.add(nextNode);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Second node chosen: {}", nextNode);
@@ -339,6 +341,7 @@ public final class PipelinePlacementPolicy extends 
SCMCommonPlacementPolicy {
 
       if (pick != null) {
         results.add(pick);
+        removePeers(pick, healthyNodes);
         exclude.add(pick);
         LOG.debug("Remaining node chosen: {}", pick);
       } else {
@@ -376,6 +379,9 @@ public final class PipelinePlacementPolicy extends 
SCMCommonPlacementPolicy {
     DatanodeDetails selectedNode =
             healthyNodes.get(getRand().nextInt(healthyNodes.size()));
     healthyNodes.remove(selectedNode);
+    if (selectedNode != null) {
+      removePeers(selectedNode, healthyNodes);
+    }
     return selectedNode;
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestMultiRaftSetup.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestMultiRaftSetup.java
new file mode 100644
index 0000000..84d025c
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestMultiRaftSetup.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+
+import org.apache.ozone.test.LambdaTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Collection;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Tests for MultiRaft set up.
+ */
+public class  TestMultiRaftSetup {
+
+  private MiniOzoneCluster cluster;
+  private StorageContainerManager scm;
+  private NodeManager nodeManager;
+  private PipelineManager pipelineManager;
+
+  private long pipelineDestroyTimeoutInMillis;
+  private static final ReplicationConfig RATIS_THREE =
+      ReplicationConfig.fromTypeAndFactor(HddsProtos.ReplicationType.RATIS,
+          HddsProtos.ReplicationFactor.THREE);
+
+  public void init(int dnCount, OzoneConfiguration conf) throws Exception {
+    cluster =
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(dnCount).build();
+    conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1000,
+        TimeUnit.MILLISECONDS);
+    pipelineDestroyTimeoutInMillis = 1000;
+    conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
+        pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS);
+    cluster.waitForClusterToBeReady();
+    scm = cluster.getStorageContainerManager();
+    nodeManager = scm.getScmNodeManager();
+    pipelineManager = scm.getPipelineManager();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testMultiRaftSamePeers() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 2);
+    conf.setBoolean(ScmConfigKeys.OZONE_SCM_DATANODE_DISALLOW_SAME_PEERS,
+        false);
+    init(3, conf);
+    waitForPipelineCreated(2);
+    Assert.assertEquals(2, pipelineManager.getPipelines(ReplicationConfig
+        .fromTypeAndFactor(HddsProtos.ReplicationType.RATIS,
+            ReplicationFactor.THREE)).size());
+    assertNotSamePeers();
+    shutdown();
+  }
+
+  @Test
+  public void testMultiRaftNotSamePeers() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 2);
+    conf.setBoolean(ScmConfigKeys.OZONE_SCM_DATANODE_DISALLOW_SAME_PEERS, 
true);
+    init(3, conf);
+    waitForPipelineCreated(1);
+    // datanode pipeline limit is set to 2, but only one set of 3 pipelines
+    // will be created. Further pipeline creation should fail
+    Assert.assertEquals(1, pipelineManager.getPipelines(RATIS_THREE).size());
+    try {
+      pipelineManager.createPipeline(RATIS_THREE);
+      Assert.fail();
+    } catch (IOException ex) {
+    }
+    shutdown();
+  }
+
+  @Test
+  public void testMultiRaft() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 2);
+    conf.setBoolean(ScmConfigKeys.OZONE_SCM_DATANODE_DISALLOW_SAME_PEERS, 
true);
+    init(5, conf);
+    waitForPipelineCreated(2);
+    // datanode pipeline limit is set to 2, but only two Ratis THREE pipeline
+    // will be created. Further pipeline creation should fail.
+    // For example, with d1,d2, d3, d4, d5, only d1 d2 d3 and d1 d4 d5 can form
+    // pipeline as the none of peers from any of existing pipelines will be
+    // repeated
+    Assert.assertEquals(2, pipelineManager.getPipelines(RATIS_THREE).size());
+    List<DatanodeDetails> dns = nodeManager.getAllNodes().stream()
+        .filter((dn) -> nodeManager.getPipelinesCount(dn) > 2).collect(
+            Collectors.toList());
+    Assert.assertEquals(1, dns.size());
+    try {
+      pipelineManager.createPipeline(RATIS_THREE);
+      Assert.fail();
+    } catch (IOException ex) {
+    }
+    Collection<PipelineID> pipelineIds = nodeManager.getPipelines(dns.get(0));
+    // Only one dataode should have 3 pipelines in total, 1 RATIS ONE pipeline
+    // and 2 RATIS 3 pipeline
+    Assert.assertEquals(3, pipelineIds.size());
+    List<Pipeline> pipelines = new ArrayList<>();
+    pipelineIds.forEach((id) -> {
+      try {
+        pipelines.add(pipelineManager.getPipeline(id));
+      } catch (PipelineNotFoundException pnfe) {
+      }
+    });
+    Assert.assertEquals(1, pipelines.stream()
+        .filter((p) -> (p.getReplicationConfig().getRequiredNodes() == 1))
+        .collect(Collectors.toList()).size());
+    Assert.assertEquals(2, pipelines.stream()
+        .filter((p) -> (p.getReplicationConfig().getRequiredNodes() == 3))
+        .collect(Collectors.toList()).size());
+    shutdown();
+  }
+  private void assertNotSamePeers() {
+    nodeManager.getAllNodes().forEach((dn) ->{
+      Collection<DatanodeDetails> peers = nodeManager.getPeerList(dn);
+      Assert.assertFalse(peers.contains(dn));
+      List<DatanodeDetails> trimList = nodeManager.getAllNodes();
+      trimList.remove(dn);
+      Assert.assertTrue(peers.containsAll(trimList));
+    });
+  }
+
+  private void waitForPipelineCreated(int num) throws Exception {
+    LambdaTestUtils.await(10000, 500, () -> {
+      List<Pipeline> pipelines =
+          pipelineManager.getPipelines(RATIS_THREE);
+      return pipelines.size() == num;
+    });
+  }
+
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to