This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f8a62d6  HDDS-3810. Add the logic to distribute open containers among 
the pipelines of a datanode. (#1274)
f8a62d6 is described below

commit f8a62d6d506974b5ee2df64bdc2ea3aea6ff7610
Author: bshashikant <shashik...@apache.org>
AuthorDate: Mon Oct 5 11:02:22 2020 +0530

    HDDS-3810. Add the logic to distribute open containers among the pipelines 
of a datanode. (#1274)
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |  4 ++
 .../common/src/main/resources/ozone-default.xml    |  7 ++
 .../hdds/scm/container/SCMContainerManager.java    | 11 +++-
 .../apache/hadoop/hdds/scm/node/DatanodeInfo.java  | 33 ++++++++++
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |  6 +-
 .../hadoop/hdds/scm/node/SCMNodeManager.java       | 53 +++++++++++++++-
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |  4 +-
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |  7 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |  6 +-
 .../hdds/scm/pipeline/SCMPipelineManager.java      | 16 ++++-
 .../hadoop/hdds/scm/block/TestBlockManager.java    | 69 ++++++++++++++++++++
 .../hadoop/hdds/scm/container/MockNodeManager.java | 28 +++++++-
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |  4 +-
 .../scm/pipeline/TestPipelinePlacementPolicy.java  |  4 +-
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  | 74 ++++++++++++++++++++++
 .../testutils/ReplicationNodeManagerMock.java      | 12 +++-
 .../hadoop/ozone/fsck/TestContainerMapper.java     |  3 +
 17 files changed, 319 insertions(+), 22 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 672b440..540d2c0 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -313,6 +313,10 @@ public final class ScmConfigKeys {
       OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
       "ozone.scm.keyvalue.container.deletion-choosing.policy";
 
+  public static final String OZONE_SCM_PIPELINE_PER_METADATA_VOLUME =
+      "ozone.scm.pipeline.per.metadata.disk";
+
+  public static final int OZONE_SCM_PIPELINE_PER_METADATA_VOLUME_DEFAULT = 2;
   // Max timeout for pipeline to stay at ALLOCATED state before scrubbed.
   public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT =
       "ozone.scm.pipeline.allocated.timeout";
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index f16ff3f..70d35a3 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -822,6 +822,13 @@
     </description>
   </property>
   <property>
+    <name>ozone.scm.pipeline.per.metadata.disk</name>
+    <value>2</value>
+    <tag>OZONE, SCM, PIPELINE</tag>
+    <description>Number of pipelines to be created per raft log disk.
+    </description>
+  </property>
+  <property>
   <name>ozone.datanode.pipeline.limit</name>
   <value>2</value>
   <tag>OZONE, SCM, PIPELINE</tag>
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 5857c4c..fa286a2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -107,6 +107,14 @@ public class SCMContainerManager implements 
ContainerManager {
     scmContainerManagerMetrics = SCMContainerManagerMetrics.create();
   }
 
+  private int getOpenContainerCountPerPipeline(Pipeline pipeline) {
+    int minContainerCountPerDn = numContainerPerVolume *
+        pipelineManager.minHealthyVolumeNum(pipeline);
+    int minPipelineCountPerDn = pipelineManager.minPipelineLimit(pipeline);
+    return (int) Math.ceil(
+        ((double) minContainerCountPerDn / minPipelineCountPerDn));
+  }
+
   private void loadExistingContainers() throws IOException {
 
     TableIterator<ContainerID, ? extends KeyValue<ContainerID, ContainerInfo>>
@@ -440,8 +448,7 @@ public class SCMContainerManager implements 
ContainerManager {
       synchronized (pipeline) {
         containerIDs = getContainersForOwner(pipeline, owner);
 
-        if (containerIDs.size() < numContainerPerVolume * pipelineManager.
-                getNumHealthyVolumes(pipeline)) {
+        if (containerIDs.size() < getOpenContainerCountPerPipeline(pipeline)) {
           containerInfo =
                   containerStateManager.allocateContainer(
                           pipelineManager, owner, pipeline);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
index b39440f..2e7bdeb 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hdds.scm.node;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto;
 import org.apache.hadoop.util.Time;
 
 import java.util.Collections;
@@ -40,6 +42,7 @@ public class DatanodeInfo extends DatanodeDetails {
   private long lastStatsUpdatedTime;
 
   private List<StorageReportProto> storageReports;
+  private List<MetadataStorageReportProto> metadataStorageReports;
 
   /**
    * Constructs DatanodeInfo from DatanodeDetails.
@@ -51,6 +54,7 @@ public class DatanodeInfo extends DatanodeDetails {
     this.lock = new ReentrantReadWriteLock();
     this.lastHeartbeatTime = Time.monotonicNow();
     this.storageReports = Collections.emptyList();
+    this.metadataStorageReports = Collections.emptyList();
   }
 
   /**
@@ -95,6 +99,22 @@ public class DatanodeInfo extends DatanodeDetails {
   }
 
   /**
+   * Updates the datanode metadata storage reports.
+   *
+   * @param reports list of metadata storage report
+   */
+  public void updateMetaDataStorageReports(
+      List<MetadataStorageReportProto> reports) {
+    try {
+      lock.writeLock().lock();
+      lastStatsUpdatedTime = Time.monotonicNow();
+      metadataStorageReports = reports;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
    * Returns the storage reports associated with this datanode.
    *
    * @return list of storage report
@@ -122,6 +142,19 @@ public class DatanodeInfo extends DatanodeDetails {
   }
 
   /**
+   * Returns count of healthy metadata volumes reported from datanode.
+   * @return count of healthy metdata log volumes
+   */
+  public int getMetaDataVolumeCount() {
+    try {
+      lock.readLock().lock();
+      return metadataStorageReports.size();
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /**
    * Returns count of failed volumes reported from datanode.
    * @return count of failed volumes
    */
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index df21b84..4af2357 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -214,5 +214,9 @@ public interface NodeManager extends 
StorageContainerNodeProtocol,
    */
   NetworkTopology getClusterNetworkTopologyMap();
 
-  int getNumHealthyVolumes(List <DatanodeDetails> dnList);
+  int minHealthyVolumeNum(List <DatanodeDetails> dnList);
+
+  int pipelineLimit(DatanodeDetails dn);
+
+  int minPipelineLimit(List<DatanodeDetails> dn);
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 1a0cec3..328f271 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -42,6 +42,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.VersionInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -103,6 +104,8 @@ public class SCMNodeManager implements NodeManager {
   private final boolean useHostname;
   private final ConcurrentHashMap<String, Set<String>> dnsToUuidMap =
       new ConcurrentHashMap<>();
+  private final int numPipelinesPerMetadataVolume;
+  private final int heavyNodeCriteria;
 
   /**
    * Constructs SCM machine Manager.
@@ -130,6 +133,11 @@ public class SCMNodeManager implements NodeManager {
     this.useHostname = conf.getBoolean(
         DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME,
         DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+    this.numPipelinesPerMetadataVolume =
+        conf.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME,
+            ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME_DEFAULT);
+    String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
+    this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
   }
 
   private void registerMXBean() {
@@ -363,6 +371,8 @@ public class SCMNodeManager implements NodeManager {
       DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails);
       if (nodeReport != null) {
         datanodeInfo.updateStorageReports(nodeReport.getStorageReportList());
+        datanodeInfo.updateMetaDataStorageReports(nodeReport.
+            getMetadataStorageReportList());
         metrics.incNumNodeReportProcessed();
       }
     } catch (NodeNotFoundException e) {
@@ -511,11 +521,11 @@ public class SCMNodeManager implements NodeManager {
   }
 
   /**
-   * Returns the max of no healthy volumes reported out of the set
+   * Returns the min of no healthy volumes reported out of the set
    * of datanodes constituting the pipeline.
    */
   @Override
-  public int getNumHealthyVolumes(List<DatanodeDetails> dnList) {
+  public int minHealthyVolumeNum(List<DatanodeDetails> dnList) {
     List<Integer> volumeCountList = new ArrayList<>(dnList.size());
     for (DatanodeDetails dn : dnList) {
       try {
@@ -527,7 +537,44 @@ public class SCMNodeManager implements NodeManager {
       }
     }
     Preconditions.checkArgument(!volumeCountList.isEmpty());
-    return Collections.max(volumeCountList);
+    return Collections.min(volumeCountList);
+  }
+
+  /**
+   * Returns the pipeline limit for the datanode.
+   * if the datanode pipeline limit is set, consider that as the max
+   * pipeline limit.
+   * In case, the pipeline limit is not set, the max pipeline limit
+   * will be based on the no of raft log volume reported and provided
+   * that it has atleast one healthy data volume.
+   */
+  @Override
+  public int pipelineLimit(DatanodeDetails dn) {
+    try {
+      if (heavyNodeCriteria > 0) {
+        return heavyNodeCriteria;
+      } else if (nodeStateManager.getNode(dn).getHealthyVolumeCount() > 0) {
+        return numPipelinesPerMetadataVolume *
+            nodeStateManager.getNode(dn).getMetaDataVolumeCount();
+      }
+    } catch (NodeNotFoundException e) {
+      LOG.warn("Cannot generate NodeStat, datanode {} not found.",
+          dn.getUuid());
+    }
+    return 0;
+  }
+
+  /**
+   * Returns the pipeline limit for set of datanodes.
+   */
+  @Override
+  public int minPipelineLimit(List<DatanodeDetails> dnList) {
+    List<Integer> pipelineCountList = new ArrayList<>(dnList.size());
+    for (DatanodeDetails dn : dnList) {
+      pipelineCountList.add(pipelineLimit(dn));
+    }
+    Preconditions.checkArgument(!pipelineCountList.isEmpty());
+    return Collections.min(pipelineCountList);
   }
 
   /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 857f76e..0cb905e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -89,7 +89,9 @@ public interface PipelineManager extends Closeable, 
PipelineManagerMXBean,
 
   void incNumBlocksAllocatedMetric(PipelineID id);
 
-  int getNumHealthyVolumes(Pipeline pipeline);
+  int minHealthyVolumeNum(Pipeline pipeline);
+
+  int minPipelineLimit(Pipeline pipeline);
 
   /**
    * Activates a dormant pipeline.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 84efdc2..b9441be 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -75,9 +75,8 @@ public final class PipelinePlacementPolicy extends 
SCMCommonPlacementPolicy {
     this.nodeManager = nodeManager;
     this.conf = conf;
     this.stateManager = stateManager;
-    this.heavyNodeCriteria = conf.getInt(
-        ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
-        ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
+    String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
+    this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
   }
 
   int currentPipelineCount(DatanodeDetails datanodeDetails, int nodesRequired) 
{
@@ -149,7 +148,7 @@ public final class PipelinePlacementPolicy extends 
SCMCommonPlacementPolicy {
         .map(d ->
             new DnWithPipelines(d, currentPipelineCount(d, nodesRequired)))
         .filter(d ->
-            ((d.getPipelines() < heavyNodeCriteria) || heavyNodeCriteria == 0))
+            (d.getPipelines() < nodeManager.pipelineLimit(d.getDn())))
         .sorted(Comparator.comparingInt(DnWithPipelines::getPipelines))
         .map(d -> d.getDn())
         .collect(Collectors.toList());
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 4d91541..e39f141 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -64,9 +64,9 @@ public class RatisPipelineProvider extends PipelineProvider {
     this.pipelineNumberLimit = conf.getInt(
         ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT,
         ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT_DEFAULT);
-    this.maxPipelinePerDatanode = conf.getInt(
-        ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
-        ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
+    String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
+    this.maxPipelinePerDatanode = dnLimit == null ? 0 :
+        Integer.parseInt(dnLimit);
   }
 
   private boolean exceedPipelineNumberLimit(ReplicationFactor factor) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index f072ebb..db7fcae 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -672,13 +672,23 @@ public class SCMPipelineManager implements 
PipelineManager {
   }
 
   /**
-   * returns max count of healthy volumes from the set of
+   * returns min number of healthy volumes from the set of
    * datanodes constituting the pipeline.
    * @param  pipeline
    * @return healthy volume count
    */
-  public int getNumHealthyVolumes(Pipeline pipeline) {
-    return nodeManager.getNumHealthyVolumes(pipeline.getNodes());
+  public int minHealthyVolumeNum(Pipeline pipeline) {
+    return nodeManager.minHealthyVolumeNum(pipeline.getNodes());
+  }
+
+  /**
+   * returns max count of raft log volumes from the set of
+   * datanodes constituting the pipeline.
+   * @param  pipeline
+   * @return healthy volume count
+   */
+  public int minPipelineLimit(Pipeline pipeline) {
+    return nodeManager.minPipelineLimit(pipeline.getNodes());
   }
 
   protected ReadWriteLock getLock() {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index a72031c..6b6e8d8 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -245,6 +245,7 @@ public class TestBlockManager {
   public void testBlockDistribution() throws Exception {
     int threadCount = numContainerPerOwnerInPipeline *
             numContainerPerOwnerInPipeline;
+    nodeManager.setNumPipelinePerDatanode(1);
     List<ExecutorService> executors = new ArrayList<>(threadCount);
     for (int i = 0; i < threadCount; i++) {
       executors.add(Executors.newSingleThreadExecutor());
@@ -304,6 +305,7 @@ public class TestBlockManager {
     int threadCount = numContainerPerOwnerInPipeline *
             numContainerPerOwnerInPipeline;
     nodeManager.setNumHealthyVolumes(numContainerPerOwnerInPipeline);
+    nodeManager.setNumPipelinePerDatanode(1);
     List<ExecutorService> executors = new ArrayList<>(threadCount);
     for (int i = 0; i < threadCount; i++) {
       executors.add(Executors.newSingleThreadExecutor());
@@ -365,6 +367,71 @@ public class TestBlockManager {
   }
 
   @Test
+  public void testBlockDistributionWithMultipleRaftLogDisks() throws Exception 
{
+    int threadCount = numContainerPerOwnerInPipeline *
+        numContainerPerOwnerInPipeline;
+    int numMetaDataVolumes = 2;
+    nodeManager.setNumHealthyVolumes(numContainerPerOwnerInPipeline);
+    nodeManager.setNumMetaDataVolumes(numMetaDataVolumes);
+    List<ExecutorService> executors = new ArrayList<>(threadCount);
+    for (int i = 0; i < threadCount; i++) {
+      executors.add(Executors.newSingleThreadExecutor());
+    }
+    pipelineManager.createPipeline(type, factor);
+    TestUtils.openAllRatisPipelines(pipelineManager);
+    Map<Long, List<AllocatedBlock>> allocatedBlockMap =
+        new ConcurrentHashMap<>();
+    List<CompletableFuture<AllocatedBlock>> futureList =
+        new ArrayList<>(threadCount);
+    for (int i = 0; i < threadCount; i++) {
+      final CompletableFuture<AllocatedBlock> future =
+          new CompletableFuture<>();
+      CompletableFuture.supplyAsync(() -> {
+        try {
+          List<AllocatedBlock> blockList;
+          AllocatedBlock block = blockManager
+              .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor,
+                  OzoneConsts.OZONE,
+                  new ExcludeList());
+          long containerId = block.getBlockID().getContainerID();
+          if (!allocatedBlockMap.containsKey(containerId)) {
+            blockList = new ArrayList<>();
+          } else {
+            blockList = allocatedBlockMap.get(containerId);
+          }
+          blockList.add(block);
+          allocatedBlockMap.put(containerId, blockList);
+          future.complete(block);
+        } catch (IOException e) {
+          future.completeExceptionally(e);
+        }
+        return future;
+      }, executors.get(i));
+      futureList.add(future);
+    }
+    try {
+      CompletableFuture
+          .allOf(futureList.toArray(
+              new CompletableFuture[futureList.size()])).get();
+      Assert.assertTrue(
+          pipelineManager.getPipelines(type).size() == 1);
+      Pipeline pipeline = pipelineManager.getPipelines(type).get(0);
+      // the pipeline per raft log disk config is set to 1 by default
+      int numContainers = (int)Math.ceil((double)
+              (numContainerPerOwnerInPipeline *
+                  numContainerPerOwnerInPipeline)/numMetaDataVolumes);
+      Assert.assertTrue(numContainers == pipelineManager.
+          getNumberOfContainers(pipeline.getId()));
+      Assert.assertTrue(
+          allocatedBlockMap.size() == numContainers);
+      Assert.assertTrue(allocatedBlockMap.
+          values().size() == numContainers);
+    } catch (Exception e) {
+      Assert.fail("testAllocateBlockInParallel failed");
+    }
+  }
+
+  @Test
   public void testAllocateOversizedBlock() throws Exception {
     long size = 6 * GB;
     thrown.expectMessage("Unsupported block size");
@@ -434,6 +501,8 @@ public class TestBlockManager {
   @Test(timeout = 10000)
   public void testMultipleBlockAllocationWithClosedContainer()
       throws IOException, TimeoutException, InterruptedException {
+    nodeManager.setNumPipelinePerDatanode(1);
+    nodeManager.setNumHealthyVolumes(1);
     // create pipelines
     for (int i = 0;
          i < nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size() / factor
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 4b8b37d..7aca0f3 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -68,6 +68,7 @@ import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
  * Test Helper for testing container Mapping.
  */
 public class MockNodeManager implements NodeManager {
+  public final static int NUM_PIPELINE_PER_METADATA_DISK = 2;
   private final static NodeData[] NODES = {
       new NodeData(10L * OzoneConsts.TB, OzoneConsts.GB),
       new NodeData(64L * OzoneConsts.TB, 100 * OzoneConsts.GB),
@@ -93,6 +94,8 @@ public class MockNodeManager implements NodeManager {
   private NetworkTopology clusterMap;
   private ConcurrentMap<String, Set<String>> dnsToUuidMap;
   private int numHealthyDisksPerDatanode;
+  private int numRaftLogDisksPerDatanode;
+  private int numPipelinePerDatanode;
 
   public MockNodeManager(NetworkTopologyImpl clusterMap,
                          List<DatanodeDetails> nodes,
@@ -123,6 +126,9 @@ public class MockNodeManager implements NodeManager {
     safemode = false;
     this.commandMap = new HashMap<>();
     numHealthyDisksPerDatanode = 1;
+    numRaftLogDisksPerDatanode = 1;
+    numPipelinePerDatanode = numRaftLogDisksPerDatanode *
+        NUM_PIPELINE_PER_METADATA_DISK;
   }
 
   public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
@@ -585,14 +591,34 @@ public class MockNodeManager implements NodeManager {
   }
 
   @Override
-  public int getNumHealthyVolumes(List<DatanodeDetails> dnList) {
+  public int minHealthyVolumeNum(List<DatanodeDetails> dnList) {
     return numHealthyDisksPerDatanode;
   }
 
+  @Override
+  public int pipelineLimit(DatanodeDetails dn) {
+    // by default 1 single node pipeline and 1 three node pipeline
+    return numPipelinePerDatanode;
+  }
+
+  @Override
+  public int minPipelineLimit(List<DatanodeDetails> dn) {
+    // by default 1 single node pipeline and 1 three node pipeline
+    return numPipelinePerDatanode;
+  }
+
+  public void setNumPipelinePerDatanode(int value) {
+    numPipelinePerDatanode = value;
+  }
+
   public void setNumHealthyVolumes(int value) {
     numHealthyDisksPerDatanode = value;
   }
 
+  public void setNumMetaDataVolumes(int value) {
+    numRaftLogDisksPerDatanode = value;
+  }
+
   /**
    * A class to declare some values for the nodes so that our tests
    * won't fail.
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 7a58d46..3f3c4ae 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -872,7 +872,7 @@ public class TestSCMNodeManager {
           .getScmUsed().get());
       assertEquals(remaining * nodeCount, (long) nodeManager.getStats()
           .getRemaining().get());
-      assertEquals(1, nodeManager.getNumHealthyVolumes(dnList));
+      assertEquals(1, nodeManager.minHealthyVolumeNum(dnList));
       dnList.clear();
     }
   }
@@ -917,7 +917,7 @@ public class TestSCMNodeManager {
 
       assertEquals(1, nodeManager.getNodeCount(HEALTHY));
       assertEquals(volumeCount / 2,
-              nodeManager.getNumHealthyVolumes(dnList));
+              nodeManager.minHealthyVolumeNum(dnList));
       dnList.clear();
     }
   }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index 1274608..f024fc5 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -86,6 +86,7 @@ public class TestPipelinePlacementPolicy {
         false, PIPELINE_PLACEMENT_MAX_NODES_COUNT);
     conf = new OzoneConfiguration();
     conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, PIPELINE_LOAD_LIMIT);
+    nodeManager.setNumPipelinePerDatanode(PIPELINE_LOAD_LIMIT);
     stateManager = new PipelineStateManager();
     placementPolicy = new PipelinePlacementPolicy(
         nodeManager, stateManager, conf);
@@ -193,7 +194,8 @@ public class TestPipelinePlacementPolicy {
         nodeManager.addPipeline(pipeline);
         stateManager.addPipeline(pipeline);
       } catch (SCMException e) {
-        break;
+        throw e;
+        //break;
       }
     }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 25957d8..d14e468 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -70,6 +70,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.InOrder;
 
+
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
@@ -275,6 +276,7 @@ public class TestSCMPipelineManager {
         new SCMPipelineManager(conf, nodeManagerMock,
             scmMetadataStore.getPipelineTable(), new EventQueue());
     pipelineManager.allowPipelineCreation();
+    nodeManagerMock.setNumPipelinePerDatanode(1);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManagerMock,
             pipelineManager.getStateManager(), conf);
@@ -336,6 +338,78 @@ public class TestSCMPipelineManager {
   }
 
   @Test
+  public void testPipelineLimit() throws Exception {
+    int numMetaDataVolumes = 2;
+    final OzoneConfiguration config = new OzoneConfiguration();
+    config.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+        false);
+    // turning off this config will ensure, pipeline creation is determined by
+    // metadata volume count.
+    config.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 0);
+    MockNodeManager nodeManagerMock = new MockNodeManager(true,
+        3);
+    nodeManagerMock.setNumMetaDataVolumes(numMetaDataVolumes);
+    int pipelinePerDn = numMetaDataVolumes *
+        MockNodeManager.NUM_PIPELINE_PER_METADATA_DISK;
+    nodeManagerMock.setNumPipelinePerDatanode(pipelinePerDn);
+    SCMPipelineManager pipelineManager =
+        new SCMPipelineManager(config, nodeManagerMock,
+            scmMetadataStore.getPipelineTable(), new EventQueue());
+    pipelineManager.allowPipelineCreation();
+    PipelineProvider mockRatisProvider =
+        new MockRatisPipelineProvider(nodeManagerMock,
+            pipelineManager.getStateManager(), config);
+    pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+        mockRatisProvider);
+
+    MetricsRecordBuilder metrics = getMetrics(
+        SCMPipelineMetrics.class.getSimpleName());
+    long numPipelineAllocated = getLongCounter("NumPipelineAllocated",
+        metrics);
+    Assert.assertEquals(0, numPipelineAllocated);
+
+    // max limit on no of pipelines is 4
+    for (int i = 0; i < pipelinePerDn; i++) {
+      Pipeline pipeline = pipelineManager
+          .createPipeline(HddsProtos.ReplicationType.RATIS,
+              HddsProtos.ReplicationFactor.THREE);
+      Assert.assertNotNull(pipeline);
+    }
+
+    metrics = getMetrics(
+        SCMPipelineMetrics.class.getSimpleName());
+    numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+    Assert.assertEquals(4, numPipelineAllocated);
+
+    long numPipelineCreateFailed = getLongCounter(
+        "NumPipelineCreationFailed", metrics);
+    Assert.assertEquals(0, numPipelineCreateFailed);
+    //This should fail...
+    try {
+      pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
+          HddsProtos.ReplicationFactor.THREE);
+      fail();
+    } catch (SCMException ioe) {
+      // pipeline creation failed this time.
+      
Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
+          ioe.getResult());
+    }
+
+    metrics = getMetrics(
+        SCMPipelineMetrics.class.getSimpleName());
+    numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+    Assert.assertEquals(4, numPipelineAllocated);
+
+    numPipelineCreateFailed = getLongCounter(
+        "NumPipelineCreationFailed", metrics);
+    Assert.assertEquals(1, numPipelineCreateFailed);
+
+    // clean up
+    pipelineManager.close();
+  }
+
+  @Test
   public void testActivateDeactivatePipeline() throws IOException {
     final SCMPipelineManager pipelineManager =
         new SCMPipelineManager(conf, nodeManager,
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index a9b879f..6d088fe 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -345,7 +345,17 @@ public class ReplicationNodeManagerMock implements 
NodeManager {
   }
 
   @Override
-  public int getNumHealthyVolumes(List<DatanodeDetails> dnList) {
+  public int minHealthyVolumeNum(List<DatanodeDetails> dnList) {
+    return 0;
+  }
+
+  @Override
+  public int pipelineLimit(DatanodeDetails dn) {
+    return 0;
+  }
+
+  @Override
+  public int minPipelineLimit(List<DatanodeDetails> dn) {
     return 0;
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
index 4fa5ea4..2d1acf7 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
@@ -72,6 +72,9 @@ public class TestContainerMapper {
     dbPath = GenericTestUtils.getRandomizedTempPath();
     conf.set(OZONE_OM_DB_DIRS, dbPath);
     conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, "100MB");
+    // By default, 2 pipelines are created. Setting the value to 6, will ensure
+    // each pipleine can have 3 containers open.
+    conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 6);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(3)
         .setScmId(SCM_ID)


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org

Reply via email to