This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f3deefbe0 HDDS-8982. Log flooded by WritableRatisContainerProvider if 
pipeline's nodes are not found (#5911)
3f3deefbe0 is described below

commit 3f3deefbe05b3b84a482b24d47e6825f6137698d
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Fri Jan 5 09:41:46 2024 +0100

    HDDS-8982. Log flooded by WritableRatisContainerProvider if pipeline's 
nodes are not found (#5911)
---
 .../hdds/scm/PipelineRequestInformation.java       |   2 +-
 .../pipeline/WritableRatisContainerProvider.java   | 173 ++++++++++----------
 .../TestWritableRatisContainerProvider.java        | 174 +++++++++++++++++++++
 .../org/apache/hadoop/fs/ozone/TestSafeMode.java   |  15 ++
 4 files changed, 270 insertions(+), 94 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineRequestInformation.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineRequestInformation.java
index ac0cfbe57b..4a4d91b2ff 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineRequestInformation.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineRequestInformation.java
@@ -22,7 +22,7 @@ package org.apache.hadoop.hdds.scm;
  * The information of the request of pipeline.
  */
 public final class PipelineRequestInformation {
-  private long size;
+  private final long size;
 
   /**
    * Builder for PipelineRequestInformation.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
index d9474f156d..f9fc651f2f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -79,97 +80,64 @@ public class WritableRatisContainerProvider
       So we can use different kind of policies.
     */
 
-    ContainerInfo containerInfo = null;
     String failureReason = null;
 
     //TODO we need to continue the refactor to use repConfig everywhere
     //in downstream managers.
 
+    PipelineRequestInformation req =
+        PipelineRequestInformation.Builder.getBuilder().setSize(size).build();
 
-    while (true) {
-      List<Pipeline> availablePipelines;
-      Pipeline pipeline;
-      // Acquire pipeline manager lock, to avoid any updates to pipeline
-      // while allocate container happens. This is to avoid scenario like
-      // mentioned in HDDS-5655.
-      pipelineManager.acquireReadLock();
-      try {
-        availablePipelines =
-                findPipelinesByState(repConfig,
-                        excludeList,
-                        Pipeline.PipelineState.OPEN);
-        if (availablePipelines.size() != 0) {
-          containerInfo = selectContainer(availablePipelines, size, owner,
-              excludeList);
-        }
-        if (containerInfo != null) {
-          return containerInfo;
-        }
-      } finally {
-        pipelineManager.releaseReadLock();
-      }
+    ContainerInfo containerInfo =
+        getContainer(repConfig, owner, excludeList, req);
+    if (containerInfo != null) {
+      return containerInfo;
+    }
 
-      if (availablePipelines.size() == 0) {
+    try {
+      // TODO: #CLUTIL Remove creation logic when all replication types
+      //  and factors are handled by pipeline creator
+      Pipeline pipeline = pipelineManager.createPipeline(repConfig);
+
+      // wait until pipeline is ready
+      pipelineManager.waitPipelineReady(pipeline.getId(), 0);
+
+    } catch (SCMException se) {
+      LOG.warn("Pipeline creation failed for repConfig {} " +
+          "Datanodes may be used up. Try to see if any pipeline is in " +
+              "ALLOCATED state, and then will wait for it to be OPEN",
+              repConfig, se);
+      List<Pipeline> allocatedPipelines = findPipelinesByState(repConfig,
+              excludeList,
+              Pipeline.PipelineState.ALLOCATED);
+      if (!allocatedPipelines.isEmpty()) {
+        List<PipelineID> allocatedPipelineIDs =
+                allocatedPipelines.stream()
+                        .map(p -> p.getId())
+                        .collect(Collectors.toList());
         try {
-          // TODO: #CLUTIL Remove creation logic when all replication types
-          //  and factors are handled by pipeline creator
-          pipeline = pipelineManager.createPipeline(repConfig);
-
-          // wait until pipeline is ready
-          pipelineManager.waitPipelineReady(pipeline.getId(), 0);
-
-        } catch (SCMException se) {
-          LOG.warn("Pipeline creation failed for repConfig {} " +
-              "Datanodes may be used up. Try to see if any pipeline is in " +
-                  "ALLOCATED state, and then will wait for it to be OPEN",
-                  repConfig, se);
-          List<Pipeline> allocatedPipelines = findPipelinesByState(repConfig,
-                  excludeList,
-                  Pipeline.PipelineState.ALLOCATED);
-          if (!allocatedPipelines.isEmpty()) {
-            List<PipelineID> allocatedPipelineIDs =
-                    allocatedPipelines.stream()
-                            .map(p -> p.getId())
-                            .collect(Collectors.toList());
-            try {
-              pipelineManager
-                      .waitOnePipelineReady(allocatedPipelineIDs, 0);
-            } catch (IOException e) {
-              LOG.warn("Waiting for one of pipelines {} to be OPEN failed. ",
-                      allocatedPipelineIDs, e);
-              failureReason = "Waiting for one of pipelines to be OPEN failed. 
"
-                  + e.getMessage();
-            }
-          } else {
-            failureReason = se.getMessage();
-          }
+          pipelineManager
+                  .waitOnePipelineReady(allocatedPipelineIDs, 0);
         } catch (IOException e) {
-          LOG.warn("Pipeline creation failed for repConfig: {}. "
-              + "Retrying get pipelines call once.", repConfig, e);
-          failureReason = e.getMessage();
-        }
-
-        pipelineManager.acquireReadLock();
-        try {
-          // If Exception occurred or successful creation of pipeline do one
-          // final try to fetch pipelines.
-          availablePipelines = findPipelinesByState(repConfig,
-                  excludeList,
-                  Pipeline.PipelineState.OPEN);
-          if (availablePipelines.size() == 0) {
-            LOG.info("Could not find available pipeline of repConfig: {} "
-                + "even after retrying", repConfig);
-            break;
-          }
-          containerInfo = selectContainer(availablePipelines, size, owner,
-              excludeList);
-          if (containerInfo != null) {
-            return containerInfo;
-          }
-        } finally {
-          pipelineManager.releaseReadLock();
+          LOG.warn("Waiting for one of pipelines {} to be OPEN failed. ",
+                  allocatedPipelineIDs, e);
+          failureReason = "Waiting for one of pipelines to be OPEN failed. "
+              + e.getMessage();
         }
+      } else {
+        failureReason = se.getMessage();
       }
+    } catch (IOException e) {
+      LOG.warn("Pipeline creation failed for repConfig: {}. "
+          + "Retrying get pipelines call once.", repConfig, e);
+      failureReason = e.getMessage();
+    }
+
+    // If Exception occurred or successful creation of pipeline do one
+    // final try to fetch pipelines.
+    containerInfo = getContainer(repConfig, owner, excludeList, req);
+    if (containerInfo != null) {
+      return containerInfo;
     }
 
     // we have tried all strategies we know but somehow we are not able
@@ -182,6 +150,22 @@ public class WritableRatisContainerProvider
             + ", replicationConfig: " + repConfig + ". " + failureReason);
   }
 
+  @Nullable
+  private ContainerInfo getContainer(ReplicationConfig repConfig, String owner,
+      ExcludeList excludeList, PipelineRequestInformation req) {
+    // Acquire pipeline manager lock, to avoid any updates to pipeline
+    // while allocate container happens. This is to avoid scenario like
+    // mentioned in HDDS-5655.
+    pipelineManager.acquireReadLock();
+    try {
+      List<Pipeline> availablePipelines = findPipelinesByState(repConfig,
+          excludeList, Pipeline.PipelineState.OPEN);
+      return selectContainer(availablePipelines, req, owner, excludeList);
+    } finally {
+      pipelineManager.releaseReadLock();
+    }
+  }
+
   private List<Pipeline> findPipelinesByState(
           final ReplicationConfig repConfig,
           final ExcludeList excludeList,
@@ -197,23 +181,26 @@ public class WritableRatisContainerProvider
     return pipelines;
   }
 
-  private ContainerInfo selectContainer(List<Pipeline> availablePipelines,
-      long size, String owner, ExcludeList excludeList) {
-    Pipeline pipeline;
-    ContainerInfo containerInfo;
+  private @Nullable ContainerInfo selectContainer(
+      List<Pipeline> availablePipelines, PipelineRequestInformation req,
+      String owner, ExcludeList excludeList) {
 
-    PipelineRequestInformation pri =
-        PipelineRequestInformation.Builder.getBuilder().setSize(size)
-                .build();
-    pipeline = pipelineChoosePolicy.choosePipeline(
-            availablePipelines, pri);
+    while (!availablePipelines.isEmpty()) {
+      Pipeline pipeline = pipelineChoosePolicy.choosePipeline(
+          availablePipelines, req);
 
-    // look for OPEN containers that match the criteria.
-    containerInfo = containerManager.getMatchingContainer(size, owner,
-        pipeline, excludeList.getContainerIds());
+      // look for OPEN containers that match the criteria.
+      final ContainerInfo containerInfo = 
containerManager.getMatchingContainer(
+          req.getSize(), owner, pipeline, excludeList.getContainerIds());
 
-    return containerInfo;
+      if (containerInfo != null) {
+        return containerInfo;
+      }
+
+      availablePipelines.remove(pipeline);
+    }
 
+    return null;
   }
 
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java
new file mode 100644
index 0000000000..06a76338e5
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import 
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class TestWritableRatisContainerProvider {
+
+  private static final ReplicationConfig REPLICATION_CONFIG =
+      RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
+  private static final String OWNER = "owner";
+  private static final int CONTAINER_SIZE = 1234;
+  private static final ExcludeList NO_EXCLUSION = new ExcludeList();
+
+  private final OzoneConfiguration conf = new OzoneConfiguration();
+  private final PipelineChoosePolicy policy = new RandomPipelineChoosePolicy();
+  private final AtomicLong containerID = new AtomicLong(1);
+
+  @Mock
+  private PipelineManager pipelineManager;
+
+  @Mock
+  private ContainerManager containerManager;
+
+  @Test
+  void returnsExistingContainer() throws Exception {
+    Pipeline pipeline = MockPipeline.createPipeline(3);
+    ContainerInfo existingContainer = pipelineHasContainer(pipeline);
+
+    existingPipelines(pipeline);
+
+    ContainerInfo container = createSubject().getContainer(CONTAINER_SIZE, 
REPLICATION_CONFIG, OWNER, NO_EXCLUSION);
+
+    assertSame(existingContainer, container);
+    verifyPipelineNotCreated();
+  }
+
+  @Test
+  void skipsPipelineWithoutContainer() throws Exception {
+    Pipeline pipeline = MockPipeline.createPipeline(3);
+    ContainerInfo existingContainer = pipelineHasContainer(pipeline);
+
+    Pipeline pipelineWithoutContainer = MockPipeline.createPipeline(3);
+    existingPipelines(pipelineWithoutContainer, pipeline);
+
+    ContainerInfo container = createSubject().getContainer(CONTAINER_SIZE, 
REPLICATION_CONFIG, OWNER, NO_EXCLUSION);
+
+    assertSame(existingContainer, container);
+    verifyPipelineNotCreated();
+  }
+
+  @Test
+  void createsNewContainerIfNoneFound() throws Exception {
+    ContainerInfo newContainer = createNewContainerOnDemand();
+
+    ContainerInfo container = createSubject().getContainer(CONTAINER_SIZE, 
REPLICATION_CONFIG, OWNER, NO_EXCLUSION);
+
+    assertSame(newContainer, container);
+    verifyPipelineCreated();
+  }
+
+  @Test
+  void failsIfContainerCannotBeCreated() throws Exception {
+    throwWhenCreatePipeline();
+
+    assertThrows(IOException.class,
+        () -> createSubject().getContainer(CONTAINER_SIZE, REPLICATION_CONFIG, 
OWNER, NO_EXCLUSION));
+
+    verifyPipelineCreated();
+  }
+
+  private void existingPipelines(Pipeline... pipelines) {
+    existingPipelines(asList(pipelines));
+  }
+
+  private void existingPipelines(List<Pipeline> pipelines) {
+    when(pipelineManager.getPipelines(REPLICATION_CONFIG, OPEN, emptySet(), 
emptySet()))
+        .thenReturn(pipelines);
+  }
+
+  private ContainerInfo pipelineHasContainer(Pipeline pipeline) {
+    ContainerInfo container = new ContainerInfo.Builder()
+        .setContainerID(containerID.getAndIncrement())
+        .setPipelineID(pipeline.getId())
+        .build();
+
+    when(containerManager.getMatchingContainer(CONTAINER_SIZE, OWNER, 
pipeline, emptySet()))
+        .thenReturn(container);
+
+    return container;
+  }
+
+  private ContainerInfo createNewContainerOnDemand() throws IOException {
+    Pipeline newPipeline = MockPipeline.createPipeline(3);
+    when(pipelineManager.createPipeline(REPLICATION_CONFIG))
+        .thenReturn(newPipeline);
+
+    when(pipelineManager.getPipelines(REPLICATION_CONFIG, OPEN, emptySet(), 
emptySet()))
+        .thenReturn(emptyList())
+        .thenReturn(singletonList(newPipeline));
+
+    return pipelineHasContainer(newPipeline);
+  }
+
+  private void throwWhenCreatePipeline() throws IOException {
+    when(pipelineManager.createPipeline(REPLICATION_CONFIG))
+        .thenThrow(new 
SCMException(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE));
+  }
+
+  private WritableRatisContainerProvider createSubject() {
+    return new WritableRatisContainerProvider(conf,
+        pipelineManager, containerManager, policy);
+  }
+
+  private void verifyPipelineCreated() throws IOException {
+    verify(pipelineManager, times(2))
+        .getPipelines(REPLICATION_CONFIG, OPEN, emptySet(), emptySet());
+    verify(pipelineManager)
+        .createPipeline(REPLICATION_CONFIG);
+  }
+
+  private void verifyPipelineNotCreated() throws IOException {
+    verify(pipelineManager, times(1))
+        .getPipelines(REPLICATION_CONFIG, OPEN, emptySet(), emptySet());
+    verify(pipelineManager, never())
+        .createPipeline(REPLICATION_CONFIG);
+  }
+
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestSafeMode.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestSafeMode.java
index d97bc5b337..f285abfaf7 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestSafeMode.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestSafeMode.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.fs.ozone;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.SafeMode;
 import org.apache.hadoop.fs.SafeModeAction;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.MiniOzoneClusterProvider;
@@ -30,18 +32,24 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.net.URI;
 import java.util.function.Function;
 
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.ozone.OzoneConsts.MB;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@Timeout(60)
 class TestSafeMode {
 
   private static final String VOLUME = "vol";
@@ -106,6 +114,13 @@ class TestSafeMode {
       // force exit safe mode and verify that it's out of safe mode.
       subject.setSafeMode(SafeModeAction.FORCE_EXIT);
       assertFalse(subject.setSafeMode(SafeModeAction.GET));
+
+      // datanodes are still stopped
+      RatisReplicationConfig replication =
+          RatisReplicationConfig.getInstance(THREE);
+      assertThrows(IOException.class, () -> 
cluster.getStorageContainerManager()
+          .getWritableContainerFactory()
+          .getContainer(MB, replication, OZONE, new ExcludeList()));
     } finally {
       IOUtils.closeQuietly(fs);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to