This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 3f3deefbe0 HDDS-8982. Log flooded by WritableRatisContainerProvider if
pipeline's nodes are not found (#5911)
3f3deefbe0 is described below
commit 3f3deefbe05b3b84a482b24d47e6825f6137698d
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Fri Jan 5 09:41:46 2024 +0100
HDDS-8982. Log flooded by WritableRatisContainerProvider if pipeline's
nodes are not found (#5911)
---
.../hdds/scm/PipelineRequestInformation.java | 2 +-
.../pipeline/WritableRatisContainerProvider.java | 173 ++++++++++----------
.../TestWritableRatisContainerProvider.java | 174 +++++++++++++++++++++
.../org/apache/hadoop/fs/ozone/TestSafeMode.java | 15 ++
4 files changed, 270 insertions(+), 94 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineRequestInformation.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineRequestInformation.java
index ac0cfbe57b..4a4d91b2ff 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineRequestInformation.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineRequestInformation.java
@@ -22,7 +22,7 @@ package org.apache.hadoop.hdds.scm;
* The information of the request of pipeline.
*/
public final class PipelineRequestInformation {
- private long size;
+ private final long size;
/**
* Builder for PipelineRequestInformation.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
index d9474f156d..f9fc651f2f 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
@@ -79,97 +80,64 @@ public class WritableRatisContainerProvider
So we can use different kind of policies.
*/
- ContainerInfo containerInfo = null;
String failureReason = null;
//TODO we need to continue the refactor to use repConfig everywhere
//in downstream managers.
+ PipelineRequestInformation req =
+ PipelineRequestInformation.Builder.getBuilder().setSize(size).build();
- while (true) {
- List<Pipeline> availablePipelines;
- Pipeline pipeline;
- // Acquire pipeline manager lock, to avoid any updates to pipeline
- // while allocate container happens. This is to avoid scenario like
- // mentioned in HDDS-5655.
- pipelineManager.acquireReadLock();
- try {
- availablePipelines =
- findPipelinesByState(repConfig,
- excludeList,
- Pipeline.PipelineState.OPEN);
- if (availablePipelines.size() != 0) {
- containerInfo = selectContainer(availablePipelines, size, owner,
- excludeList);
- }
- if (containerInfo != null) {
- return containerInfo;
- }
- } finally {
- pipelineManager.releaseReadLock();
- }
+ ContainerInfo containerInfo =
+ getContainer(repConfig, owner, excludeList, req);
+ if (containerInfo != null) {
+ return containerInfo;
+ }
- if (availablePipelines.size() == 0) {
+ try {
+ // TODO: #CLUTIL Remove creation logic when all replication types
+ // and factors are handled by pipeline creator
+ Pipeline pipeline = pipelineManager.createPipeline(repConfig);
+
+ // wait until pipeline is ready
+ pipelineManager.waitPipelineReady(pipeline.getId(), 0);
+
+ } catch (SCMException se) {
+ LOG.warn("Pipeline creation failed for repConfig {} " +
+ "Datanodes may be used up. Try to see if any pipeline is in " +
+ "ALLOCATED state, and then will wait for it to be OPEN",
+ repConfig, se);
+ List<Pipeline> allocatedPipelines = findPipelinesByState(repConfig,
+ excludeList,
+ Pipeline.PipelineState.ALLOCATED);
+ if (!allocatedPipelines.isEmpty()) {
+ List<PipelineID> allocatedPipelineIDs =
+ allocatedPipelines.stream()
+ .map(p -> p.getId())
+ .collect(Collectors.toList());
try {
- // TODO: #CLUTIL Remove creation logic when all replication types
- // and factors are handled by pipeline creator
- pipeline = pipelineManager.createPipeline(repConfig);
-
- // wait until pipeline is ready
- pipelineManager.waitPipelineReady(pipeline.getId(), 0);
-
- } catch (SCMException se) {
- LOG.warn("Pipeline creation failed for repConfig {} " +
- "Datanodes may be used up. Try to see if any pipeline is in " +
- "ALLOCATED state, and then will wait for it to be OPEN",
- repConfig, se);
- List<Pipeline> allocatedPipelines = findPipelinesByState(repConfig,
- excludeList,
- Pipeline.PipelineState.ALLOCATED);
- if (!allocatedPipelines.isEmpty()) {
- List<PipelineID> allocatedPipelineIDs =
- allocatedPipelines.stream()
- .map(p -> p.getId())
- .collect(Collectors.toList());
- try {
- pipelineManager
- .waitOnePipelineReady(allocatedPipelineIDs, 0);
- } catch (IOException e) {
- LOG.warn("Waiting for one of pipelines {} to be OPEN failed. ",
- allocatedPipelineIDs, e);
- failureReason = "Waiting for one of pipelines to be OPEN failed.
"
- + e.getMessage();
- }
- } else {
- failureReason = se.getMessage();
- }
+ pipelineManager
+ .waitOnePipelineReady(allocatedPipelineIDs, 0);
} catch (IOException e) {
- LOG.warn("Pipeline creation failed for repConfig: {}. "
- + "Retrying get pipelines call once.", repConfig, e);
- failureReason = e.getMessage();
- }
-
- pipelineManager.acquireReadLock();
- try {
- // If Exception occurred or successful creation of pipeline do one
- // final try to fetch pipelines.
- availablePipelines = findPipelinesByState(repConfig,
- excludeList,
- Pipeline.PipelineState.OPEN);
- if (availablePipelines.size() == 0) {
- LOG.info("Could not find available pipeline of repConfig: {} "
- + "even after retrying", repConfig);
- break;
- }
- containerInfo = selectContainer(availablePipelines, size, owner,
- excludeList);
- if (containerInfo != null) {
- return containerInfo;
- }
- } finally {
- pipelineManager.releaseReadLock();
+ LOG.warn("Waiting for one of pipelines {} to be OPEN failed. ",
+ allocatedPipelineIDs, e);
+ failureReason = "Waiting for one of pipelines to be OPEN failed. "
+ + e.getMessage();
}
+ } else {
+ failureReason = se.getMessage();
}
+ } catch (IOException e) {
+ LOG.warn("Pipeline creation failed for repConfig: {}. "
+ + "Retrying get pipelines call once.", repConfig, e);
+ failureReason = e.getMessage();
+ }
+
+ // If Exception occurred or successful creation of pipeline do one
+ // final try to fetch pipelines.
+ containerInfo = getContainer(repConfig, owner, excludeList, req);
+ if (containerInfo != null) {
+ return containerInfo;
}
// we have tried all strategies we know but somehow we are not able
@@ -182,6 +150,22 @@ public class WritableRatisContainerProvider
+ ", replicationConfig: " + repConfig + ". " + failureReason);
}
+ @Nullable
+ private ContainerInfo getContainer(ReplicationConfig repConfig, String owner,
+ ExcludeList excludeList, PipelineRequestInformation req) {
+ // Acquire pipeline manager lock, to avoid any updates to pipeline
+ // while allocate container happens. This is to avoid scenario like
+ // mentioned in HDDS-5655.
+ pipelineManager.acquireReadLock();
+ try {
+ List<Pipeline> availablePipelines = findPipelinesByState(repConfig,
+ excludeList, Pipeline.PipelineState.OPEN);
+ return selectContainer(availablePipelines, req, owner, excludeList);
+ } finally {
+ pipelineManager.releaseReadLock();
+ }
+ }
+
private List<Pipeline> findPipelinesByState(
final ReplicationConfig repConfig,
final ExcludeList excludeList,
@@ -197,23 +181,26 @@ public class WritableRatisContainerProvider
return pipelines;
}
- private ContainerInfo selectContainer(List<Pipeline> availablePipelines,
- long size, String owner, ExcludeList excludeList) {
- Pipeline pipeline;
- ContainerInfo containerInfo;
+ private @Nullable ContainerInfo selectContainer(
+ List<Pipeline> availablePipelines, PipelineRequestInformation req,
+ String owner, ExcludeList excludeList) {
- PipelineRequestInformation pri =
- PipelineRequestInformation.Builder.getBuilder().setSize(size)
- .build();
- pipeline = pipelineChoosePolicy.choosePipeline(
- availablePipelines, pri);
+ while (!availablePipelines.isEmpty()) {
+ Pipeline pipeline = pipelineChoosePolicy.choosePipeline(
+ availablePipelines, req);
- // look for OPEN containers that match the criteria.
- containerInfo = containerManager.getMatchingContainer(size, owner,
- pipeline, excludeList.getContainerIds());
+ // look for OPEN containers that match the criteria.
+ final ContainerInfo containerInfo =
containerManager.getMatchingContainer(
+ req.getSize(), owner, pipeline, excludeList.getContainerIds());
- return containerInfo;
+ if (containerInfo != null) {
+ return containerInfo;
+ }
+
+ availablePipelines.remove(pipeline);
+ }
+ return null;
}
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java
new file mode 100644
index 0000000000..06a76338e5
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class TestWritableRatisContainerProvider {
+
+ private static final ReplicationConfig REPLICATION_CONFIG =
+ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
+ private static final String OWNER = "owner";
+ private static final int CONTAINER_SIZE = 1234;
+ private static final ExcludeList NO_EXCLUSION = new ExcludeList();
+
+ private final OzoneConfiguration conf = new OzoneConfiguration();
+ private final PipelineChoosePolicy policy = new RandomPipelineChoosePolicy();
+ private final AtomicLong containerID = new AtomicLong(1);
+
+ @Mock
+ private PipelineManager pipelineManager;
+
+ @Mock
+ private ContainerManager containerManager;
+
+ @Test
+ void returnsExistingContainer() throws Exception {
+ Pipeline pipeline = MockPipeline.createPipeline(3);
+ ContainerInfo existingContainer = pipelineHasContainer(pipeline);
+
+ existingPipelines(pipeline);
+
+ ContainerInfo container = createSubject().getContainer(CONTAINER_SIZE,
REPLICATION_CONFIG, OWNER, NO_EXCLUSION);
+
+ assertSame(existingContainer, container);
+ verifyPipelineNotCreated();
+ }
+
+ @Test
+ void skipsPipelineWithoutContainer() throws Exception {
+ Pipeline pipeline = MockPipeline.createPipeline(3);
+ ContainerInfo existingContainer = pipelineHasContainer(pipeline);
+
+ Pipeline pipelineWithoutContainer = MockPipeline.createPipeline(3);
+ existingPipelines(pipelineWithoutContainer, pipeline);
+
+ ContainerInfo container = createSubject().getContainer(CONTAINER_SIZE,
REPLICATION_CONFIG, OWNER, NO_EXCLUSION);
+
+ assertSame(existingContainer, container);
+ verifyPipelineNotCreated();
+ }
+
+ @Test
+ void createsNewContainerIfNoneFound() throws Exception {
+ ContainerInfo newContainer = createNewContainerOnDemand();
+
+ ContainerInfo container = createSubject().getContainer(CONTAINER_SIZE,
REPLICATION_CONFIG, OWNER, NO_EXCLUSION);
+
+ assertSame(newContainer, container);
+ verifyPipelineCreated();
+ }
+
+ @Test
+ void failsIfContainerCannotBeCreated() throws Exception {
+ throwWhenCreatePipeline();
+
+ assertThrows(IOException.class,
+ () -> createSubject().getContainer(CONTAINER_SIZE, REPLICATION_CONFIG,
OWNER, NO_EXCLUSION));
+
+ verifyPipelineCreated();
+ }
+
+ private void existingPipelines(Pipeline... pipelines) {
+ existingPipelines(asList(pipelines));
+ }
+
+ private void existingPipelines(List<Pipeline> pipelines) {
+ when(pipelineManager.getPipelines(REPLICATION_CONFIG, OPEN, emptySet(),
emptySet()))
+ .thenReturn(pipelines);
+ }
+
+ private ContainerInfo pipelineHasContainer(Pipeline pipeline) {
+ ContainerInfo container = new ContainerInfo.Builder()
+ .setContainerID(containerID.getAndIncrement())
+ .setPipelineID(pipeline.getId())
+ .build();
+
+ when(containerManager.getMatchingContainer(CONTAINER_SIZE, OWNER,
pipeline, emptySet()))
+ .thenReturn(container);
+
+ return container;
+ }
+
+ private ContainerInfo createNewContainerOnDemand() throws IOException {
+ Pipeline newPipeline = MockPipeline.createPipeline(3);
+ when(pipelineManager.createPipeline(REPLICATION_CONFIG))
+ .thenReturn(newPipeline);
+
+ when(pipelineManager.getPipelines(REPLICATION_CONFIG, OPEN, emptySet(),
emptySet()))
+ .thenReturn(emptyList())
+ .thenReturn(singletonList(newPipeline));
+
+ return pipelineHasContainer(newPipeline);
+ }
+
+ private void throwWhenCreatePipeline() throws IOException {
+ when(pipelineManager.createPipeline(REPLICATION_CONFIG))
+ .thenThrow(new
SCMException(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE));
+ }
+
+ private WritableRatisContainerProvider createSubject() {
+ return new WritableRatisContainerProvider(conf,
+ pipelineManager, containerManager, policy);
+ }
+
+ private void verifyPipelineCreated() throws IOException {
+ verify(pipelineManager, times(2))
+ .getPipelines(REPLICATION_CONFIG, OPEN, emptySet(), emptySet());
+ verify(pipelineManager)
+ .createPipeline(REPLICATION_CONFIG);
+ }
+
+ private void verifyPipelineNotCreated() throws IOException {
+ verify(pipelineManager, times(1))
+ .getPipelines(REPLICATION_CONFIG, OPEN, emptySet(), emptySet());
+ verify(pipelineManager, never())
+ .createPipeline(REPLICATION_CONFIG);
+ }
+
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestSafeMode.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestSafeMode.java
index d97bc5b337..f285abfaf7 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestSafeMode.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestSafeMode.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.fs.ozone;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.SafeMode;
import org.apache.hadoop.fs.SafeModeAction;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClusterProvider;
@@ -30,18 +32,24 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.net.URI;
import java.util.function.Function;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.ozone.OzoneConsts.MB;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+@Timeout(60)
class TestSafeMode {
private static final String VOLUME = "vol";
@@ -106,6 +114,13 @@ class TestSafeMode {
// force exit safe mode and verify that it's out of safe mode.
subject.setSafeMode(SafeModeAction.FORCE_EXIT);
assertFalse(subject.setSafeMode(SafeModeAction.GET));
+
+ // datanodes are still stopped
+ RatisReplicationConfig replication =
+ RatisReplicationConfig.getInstance(THREE);
+ assertThrows(IOException.class, () ->
cluster.getStorageContainerManager()
+ .getWritableContainerFactory()
+ .getContainer(MB, replication, OZONE, new ExcludeList()));
} finally {
IOUtils.closeQuietly(fs);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]