This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 2cbb0e634a HDDS-9479. Pipeline close doesn't wait for containers to be
closed. (#5604)
2cbb0e634a is described below
commit 2cbb0e634ae065c26c26d7db74c7c139034cc876
Author: Nandakumar Vadivelu <[email protected]>
AuthorDate: Wed Nov 29 16:16:46 2023 +0530
HDDS-9479. Pipeline close doesn't wait for containers to be closed. (#5604)
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 8 ++-
.../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 16 ++++++
.../hadoop/hdds/scm/node/DeadNodeHandler.java | 6 +--
.../hdds/scm/node/HealthyReadOnlyNodeHandler.java | 2 +-
.../hadoop/hdds/scm/node/StaleNodeHandler.java | 4 +-
.../hdds/scm/node/StartDatanodeAdminHandler.java | 4 +-
.../hdds/scm/pipeline/PipelineActionHandler.java | 3 +-
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 5 ++
.../hdds/scm/pipeline/PipelineManagerImpl.java | 63 ++++++++++++++--------
.../hdds/scm/pipeline/MockPipelineManager.java | 10 ++++
.../scm/pipeline/TestPipelineActionHandler.java | 8 +--
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 29 +++++++---
.../hdds/scm/pipeline/TestNode2PipelineMap.java | 4 +-
.../hdds/scm/pipeline/TestPipelineClose.java | 8 ++-
.../client/rpc/TestContainerStateMachine.java | 3 ++
.../rpc/TestContainerStateMachineFailures.java | 9 ++--
.../rpc/TestContainerStateMachineFlushDelay.java | 4 ++
...estOzoneClientRetriesOnExceptionFlushDelay.java | 4 ++
.../rpc/TestOzoneClientRetriesOnExceptions.java | 4 ++
.../ozone/client/rpc/TestWatchForCommit.java | 9 ++--
.../hadoop/ozone/recon/TestReconAsPassiveScm.java | 3 +-
.../scm/node/TestDecommissionAndMaintenance.java | 4 ++
.../ozone/scm/pipeline/TestSCMPipelineMetrics.java | 10 ++--
.../ozone/recon/scm/ReconPipelineManager.java | 3 +-
24 files changed, 158 insertions(+), 65 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 1deff3e409..a2c00d5c21 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -420,8 +420,12 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_PIPELINE_DESTROY_TIMEOUT =
"ozone.scm.pipeline.destroy.timeout";
+ // We wait for 150s before closing containers
+ // OzoneConfigKeys#OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION.
+ // So, we are waiting for another 150s before deleting the pipeline
+ // (150 + 150) = 300s
public static final String OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT =
- "66s";
+ "300s";
public static final String OZONE_SCM_PIPELINE_CREATION_INTERVAL =
"ozone.scm.pipeline.creation.interval";
@@ -431,7 +435,7 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL =
"ozone.scm.pipeline.scrub.interval";
public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT =
- "5m";
+ "150s";
// Allow SCM to auto create factor ONE ratis pipeline.
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 42aff0678a..486f6781f9 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -87,10 +87,21 @@ public final class Pipeline {
// suggested leader id with high priority
private final UUID suggestedLeaderId;
+ private final Instant stateEnterTime;
+
/**
* The immutable properties of pipeline object is used in
* ContainerStateManager#getMatchingContainerByPipeline to take a lock on
* the container allocations for a particular pipeline.
+ * <br><br>
+ * Since the Pipeline class is immutable, if we want to change the state of
+ * the Pipeline we should create a new Pipeline object with the new state.
+ * Make sure that you set the value of <i>creationTimestamp</i> properly
while
+ * creating the new Pipeline object.
+ * <br><br>
+ * There is no need to worry about the value of <i>stateEnterTime</i> as it's
+ * set to <i>Instant.now</i> when you crate the Pipeline object as part of
+ * state change.
*/
private Pipeline(PipelineID id,
ReplicationConfig replicationConfig, PipelineState state,
@@ -102,6 +113,7 @@ public final class Pipeline {
this.creationTimestamp = Instant.now();
this.suggestedLeaderId = suggestedLeaderId;
this.replicaIndexes = new HashMap<>();
+ this.stateEnterTime = Instant.now();
}
/**
@@ -140,6 +152,10 @@ public final class Pipeline {
return creationTimestamp;
}
+ public Instant getStateEnterTime() {
+ return stateEnterTime;
+ }
+
/**
* Return the suggested leaderId which has a high priority among DNs of the
* pipeline.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index b95998c1da..f05eb761d9 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -80,8 +80,8 @@ public class DeadNodeHandler implements
EventHandler<DatanodeDetails> {
* action.
*/
LOG.info("A dead datanode is detected. {}", datanodeDetails);
- destroyPipelines(datanodeDetails);
closeContainers(datanodeDetails, publisher);
+ destroyPipelines(datanodeDetails);
// Remove the container replicas associated with the dead node unless it
// is IN_MAINTENANCE
@@ -122,8 +122,8 @@ public class DeadNodeHandler implements
EventHandler<DatanodeDetails> {
.ifPresent(pipelines ->
pipelines.forEach(id -> {
try {
- pipelineManager.closePipeline(
- pipelineManager.getPipeline(id), false);
+ pipelineManager.closePipeline(id);
+ pipelineManager.deletePipeline(id);
} catch (PipelineNotFoundException ignore) {
// Pipeline is not there in pipeline manager,
// should we care?
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HealthyReadOnlyNodeHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HealthyReadOnlyNodeHandler.java
index 3286133009..2256829942 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HealthyReadOnlyNodeHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HealthyReadOnlyNodeHandler.java
@@ -90,7 +90,7 @@ public class HealthyReadOnlyNodeHandler
pipelineID, pipeline.getPipelineState(),
HddsProtos.NodeState.HEALTHY_READONLY,
datanodeDetails.getUuidString());
- pipelineManager.closePipeline(pipeline, true);
+ pipelineManager.closePipeline(pipelineID);
} catch (IOException ex) {
LOG.error("Failed to close pipeline {} which uses HEALTHY READONLY " +
"datanode {}: ", pipelineID, datanodeDetails, ex);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
index dd8cea3669..c58cd054f9 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -59,8 +58,7 @@ public class StaleNodeHandler implements
EventHandler<DatanodeDetails> {
datanodeDetails, pipelineIds);
for (PipelineID pipelineID : pipelineIds) {
try {
- Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
- pipelineManager.closePipeline(pipeline, true);
+ pipelineManager.closePipeline(pipelineID);
} catch (IOException e) {
LOG.info("Could not finalize pipeline={} for dn={}", pipelineID,
datanodeDetails);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StartDatanodeAdminHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StartDatanodeAdminHandler.java
index 783eb1358e..0951ea8114 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StartDatanodeAdminHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StartDatanodeAdminHandler.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -57,8 +56,7 @@ public class StartDatanodeAdminHandler
datanodeDetails, pipelineIds);
for (PipelineID pipelineID : pipelineIds) {
try {
- Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
- pipelineManager.closePipeline(pipeline, false);
+ pipelineManager.closePipeline(pipelineID);
} catch (IOException e) {
LOG.info("Could not finalize pipeline={} for dn={}", pipelineID,
datanodeDetails);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
index e33f256a44..2f1785cf17 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
@@ -84,8 +84,7 @@ public class PipelineActionHandler
info.getDetailedReason());
if (action == PipelineAction.Action.CLOSE) {
- pipelineManager.closePipeline(
- pipelineManager.getPipeline(pid), false);
+ pipelineManager.closePipeline(pid);
} else {
LOG.error("unknown pipeline action:{}", action);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 2df7e6db5f..15b0f408c5 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -115,9 +115,14 @@ public interface PipelineManager extends Closeable,
PipelineManagerMXBean {
void openPipeline(PipelineID pipelineId) throws IOException;
+ @Deprecated
void closePipeline(Pipeline pipeline, boolean onTimeout)
throws IOException;
+ void closePipeline(PipelineID pipelineID) throws IOException;
+
+ void deletePipeline(PipelineID pipelineID) throws IOException;
+
void closeStalePipelines(DatanodeDetails datanodeDetails);
void scrubPipelines() throws IOException;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 56a6de3e05..000d3e7363 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -489,17 +489,25 @@ public class PipelineManagerImpl implements
PipelineManager {
* put pipeline in CLOSED state.
* @param pipeline - ID of the pipeline.
* @param onTimeout - whether to remove pipeline after some time.
- * @throws IOException
+ * @throws IOException throws exception in case of failure
+ * @deprecated Do not use this method, onTimeout is not honored.
*/
- @Override
+ @Deprecated
public void closePipeline(Pipeline pipeline, boolean onTimeout)
- throws IOException {
- PipelineID pipelineID = pipeline.getId();
+ throws IOException {
+ closePipeline(pipeline.getId());
+ }
+
+ /**
+ * Move the Pipeline to CLOSED state.
+ * @param pipelineID ID of the Pipeline to be closed
+ * @throws IOException In case of exception while closing the Pipeline
+ */
+ public void closePipeline(PipelineID pipelineID) throws IOException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
// close containers.
closeContainersForPipeline(pipelineID);
-
- if (!pipeline.isClosed()) {
+ if (!getPipeline(pipelineID).isClosed()) {
acquireWriteLock();
try {
stateManager.updatePipelineState(pipelineIDProtobuf,
@@ -507,15 +515,20 @@ public class PipelineManagerImpl implements
PipelineManager {
} finally {
releaseWriteLock();
}
- LOG.info("Pipeline {} moved to CLOSED state", pipeline);
+ LOG.info("Pipeline {} moved to CLOSED state", pipelineID);
}
metrics.removePipelineMetrics(pipelineID);
- if (!onTimeout) {
- // close pipeline right away.
- removePipeline(pipeline);
- }
+ }
+
+ /**
+ * Deletes the Pipeline for the given PipelineID.
+ * @param pipelineID ID of the Pipeline to be deleted
+ * @throws IOException In case of exception while deleting the Pipeline
+ */
+ public void deletePipeline(PipelineID pipelineID) throws IOException {
+ removePipeline(getPipeline(pipelineID));
}
/** close the pipelines whose nodes' IPs are stale.
@@ -535,9 +548,10 @@ public class PipelineManagerImpl implements
PipelineManager {
pipelinesWithStaleIpOrHostname.size());
pipelinesWithStaleIpOrHostname.forEach(p -> {
try {
- LOG.info("Closing the stale pipeline: {}", p.getId());
- closePipeline(p, false);
- LOG.info("Closed the stale pipeline: {}", p.getId());
+ final PipelineID id = p.getId();
+ LOG.info("Closing the stale pipeline: {}", id);
+ closePipeline(id);
+ deletePipeline(id);
} catch (IOException e) {
LOG.error("Closing the stale pipeline failed: {}", p, e);
}
@@ -568,26 +582,34 @@ public class PipelineManagerImpl implements
PipelineManager {
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
+ long pipelineDeleteTimoutInMills = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
List<Pipeline> candidates = stateManager.getPipelines();
for (Pipeline p : candidates) {
+ final PipelineID id = p.getId();
// scrub pipelines who stay ALLOCATED for too long.
if (p.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
(currentTime.toEpochMilli() - p.getCreationTimestamp()
.toEpochMilli() >= pipelineScrubTimeoutInMills)) {
+
LOG.info("Scrubbing pipeline: id: {} since it stays at ALLOCATED " +
- "stage for {} mins.", p.getId(),
+ "stage for {} mins.", id,
Duration.between(currentTime, p.getCreationTimestamp())
.toMinutes());
- closePipeline(p, false);
+ closePipeline(id);
+ deletePipeline(id);
}
// scrub pipelines who stay CLOSED for too long.
- if (p.getPipelineState() == Pipeline.PipelineState.CLOSED) {
+ if (p.getPipelineState() == Pipeline.PipelineState.CLOSED &&
+ (currentTime.toEpochMilli() - p.getStateEnterTime().toEpochMilli())
+ >= pipelineDeleteTimoutInMills) {
LOG.info("Scrubbing pipeline: id: {} since it stays at CLOSED stage.",
p.getId());
- closeContainersForPipeline(p.getId());
- removePipeline(p);
+ deletePipeline(id);
}
// If a datanode is stopped and then SCM is restarted, a pipeline can get
// stuck in an open state. For Ratis, provided some other DNs that were
@@ -599,8 +621,7 @@ public class PipelineManagerImpl implements PipelineManager
{
if (isOpenWithUnregisteredNodes(p)) {
LOG.info("Scrubbing pipeline: id: {} as it has unregistered nodes",
p.getId());
- closeContainersForPipeline(p.getId());
- closePipeline(p, true);
+ closePipeline(id);
}
}
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 45ab65cd3f..6ece2ecb88 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -243,6 +243,16 @@ public class MockPipelineManager implements
PipelineManager {
HddsProtos.PipelineState.PIPELINE_CLOSED);
}
+ @Override
+ public void closePipeline(PipelineID pipelineID) throws IOException {
+
+ }
+
+ @Override
+ public void deletePipeline(PipelineID pipelineID) throws IOException {
+
+ }
+
@Override
public void closeStalePipelines(DatanodeDetails datanodeDetails) {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
index 6ac0b53863..791220f670 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
@@ -26,10 +26,10 @@ import org.apache.hadoop.hdds.scm.ha.SCMContext;
import
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import java.io.IOException;
import java.util.UUID;
/**
@@ -39,12 +39,12 @@ public class TestPipelineActionHandler {
@Test
public void testCloseActionForMissingPipeline()
- throws PipelineNotFoundException, NotLeaderException {
+ throws IOException {
final PipelineManager manager = Mockito.mock(PipelineManager.class);
final EventQueue queue = Mockito.mock(EventQueue.class);
- Mockito.when(manager.getPipeline(Mockito.any(PipelineID.class)))
- .thenThrow(new PipelineNotFoundException());
+ Mockito.doThrow(new PipelineNotFoundException())
+ .when(manager).closePipeline(Mockito.any(PipelineID.class));
final PipelineActionHandler actionHandler =
new PipelineActionHandler(manager, SCMContext.emptyContext(), null);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index 33243b650e..48f82b5cc9 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -86,6 +86,7 @@ import java.util.stream.Collectors;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
import static
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.ALLOCATED;
import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
@@ -341,7 +342,9 @@ public class TestPipelineManagerImpl {
}
// Destroy pipeline
- pipelineManager.closePipeline(pipeline, false);
+ pipelineManager.closePipeline(pipeline.getId());
+ pipelineManager.deletePipeline(pipeline.getId());
+
try {
pipelineManager.getPipeline(pipeline.getId());
fail("Pipeline should not have been retrieved");
@@ -393,7 +396,8 @@ public class TestPipelineManagerImpl {
pipelineManager.getPipeline(pipeline.getId()).isOpen());
// close the pipeline
- pipelineManager.closePipeline(pipeline, false);
+ pipelineManager.closePipeline(pipeline.getId());
+ pipelineManager.deletePipeline(pipeline.getId());
// pipeline report for destroyed pipeline should be ignored
nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
@@ -514,6 +518,8 @@ public class TestPipelineManagerImpl {
// Allocated pipelines should not be scrubbed for 50 seconds.
conf.setTimeDuration(
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, 50, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 50, TimeUnit.SECONDS);
PipelineManagerImpl pipelineManager = createPipelineManager(true);
Pipeline allocatedPipeline = pipelineManager
@@ -553,8 +559,9 @@ public class TestPipelineManagerImpl {
.getInstance(ReplicationFactor.THREE),
Pipeline.PipelineState.ALLOCATED).contains(allocatedPipeline));
- // The closedPipeline should be scrubbed, as they are scrubbed immediately
- Assertions.assertFalse(pipelineManager
+ // The closedPipeline should not be scrubbed as the interval has not
+ // yet passed.
+ Assertions.assertTrue(pipelineManager
.getPipelines(RatisReplicationConfig
.getInstance(ReplicationFactor.THREE),
Pipeline.PipelineState.CLOSED).contains(closedPipeline));
@@ -569,6 +576,12 @@ public class TestPipelineManagerImpl {
.getInstance(ReplicationFactor.THREE),
Pipeline.PipelineState.ALLOCATED).contains(allocatedPipeline));
+ // The closedPipeline should now be scrubbed as the interval has passed
+ Assertions.assertFalse(pipelineManager
+ .getPipelines(RatisReplicationConfig
+ .getInstance(ReplicationFactor.THREE),
+ Pipeline.PipelineState.CLOSED).contains(closedPipeline));
+
pipelineManager.close();
}
@@ -742,11 +755,11 @@ public class TestPipelineManagerImpl {
addContainer(containerInfo.getProtobuf());
//Add Container to PipelineStateMap
pipelineManager.addContainerToPipeline(pipelineID, containerID);
- pipelineManager.closePipeline(pipeline, false);
+ pipelineManager.closePipeline(pipelineID);
String containerExpectedOutput = "Container " + containerID +
" closed for pipeline=" + pipelineID;
String pipelineExpectedOutput =
- "Pipeline " + pipeline + " moved to CLOSED state";
+ "Pipeline " + pipelineID + " moved to CLOSED state";
String logOutput = logCapturer.getOutput();
assertTrue(logOutput.contains(containerExpectedOutput));
assertTrue(logOutput.contains(pipelineExpectedOutput));
@@ -847,9 +860,9 @@ public class TestPipelineManagerImpl {
pipelineManager.closeStalePipelines(datanodeDetails);
verify(pipelineManager, times(1))
- .closePipeline(stalePipelines.get(0), false);
+ .closePipeline(stalePipelines.get(0).getId());
verify(pipelineManager, times(1))
- .closePipeline(stalePipelines.get(1), false);
+ .closePipeline(stalePipelines.get(1).getId());
}
@Test
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index 6604cd652f..dd0e5d9f31 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -116,8 +116,8 @@ public class TestNode2PipelineMap {
ratisContainer.getPipeline().getId());
Assertions.assertEquals(0, set2.size());
- pipelineManager
- .closePipeline(ratisContainer.getPipeline(), false);
+ pipelineManager.closePipeline(ratisContainer.getPipeline().getId());
+ pipelineManager.deletePipeline(ratisContainer.getPipeline().getId());
pipelines = scm.getScmNodeManager()
.getPipelines(dns.get(0));
Assertions.assertFalse(
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index bc312719cf..a5ca909b3e 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -39,6 +39,7 @@ import
org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -84,6 +85,9 @@ public class TestPipelineClose {
@BeforeEach
public void init() throws Exception {
conf = new OzoneConfiguration();
+ conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s");
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1000,
TimeUnit.MILLISECONDS);
@@ -136,8 +140,8 @@ public class TestPipelineClose {
.getContainersInPipeline(ratisContainer.getPipeline().getId());
Assert.assertEquals(0, setClosed.size());
- pipelineManager
- .closePipeline(ratisContainer.getPipeline(), false);
+ pipelineManager.closePipeline(ratisContainer.getPipeline().getId());
+ pipelineManager.deletePipeline(ratisContainer.getPipeline().getId());
for (DatanodeDetails dn : ratisContainer.getPipeline().getNodes()) {
// Assert that the pipeline has been removed from Node2PipelineMap as
well
Assert.assertFalse(scm.getScmNodeManager().getPipelines(dn)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index ab36e079c1..7c0fcd4372 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -107,6 +107,9 @@ public class TestContainerStateMachine {
conf.setQuietMode(false);
OzoneManager.setTestSecureOmFlag(true);
conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
+ conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s");
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setStreamBufferFlushDelay(false);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 834669bfcd..717304a5d0 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -43,6 +43,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
@@ -84,7 +85,6 @@ import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERV
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import org.apache.ratis.protocol.RaftGroupId;
@@ -136,8 +136,9 @@ public class TestContainerStateMachineFailures {
conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
- conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 1,
- TimeUnit.SECONDS);
+ conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s");
RatisClientConfig ratisClientConfig =
conf.getObject(RatisClientConfig.class);
@@ -480,7 +481,7 @@ public class TestContainerStateMachineFailures {
}
// when remove pipeline, group dir including snapshot will be deleted
- LambdaTestUtils.await(5000, 500,
+ LambdaTestUtils.await(10000, 500,
() -> (!snapshot.getPath().toFile().exists()));
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java
index 8be227fd18..c24f209cde 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -111,6 +112,9 @@ public class TestContainerStateMachineFlushDelay {
OzoneManager.setTestSecureOmFlag(true);
conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
// conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
+ conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s");
cluster =
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1)
.setBlockSize(blockSize)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java
index 7714f9d226..f3a0142f48 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
@@ -100,6 +101,9 @@ public class TestOzoneClientRetriesOnExceptionFlushDelay {
conf.setFromObject(config);
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 3);
+ conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s");
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptions.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptions.java
index c84ea1c8c1..e9497cd73f 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptions.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptions.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
@@ -107,6 +108,9 @@ public class TestOzoneClientRetriesOnExceptions {
conf.setFromObject(clientConfig);
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 3);
+ conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s");
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index b52b10ed53..b9fb0d425b 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -50,6 +50,7 @@ import
org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.client.ObjectStore;
@@ -63,7 +64,6 @@ import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Flaky;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.junit.Assert;
@@ -109,9 +109,10 @@ public class TestWatchForCommit {
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setStreamBufferFlushDelay(false);
conf.setFromObject(clientConfig);
-
- conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
- TimeUnit.SECONDS);
+
+ conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s");
conf.setQuietMode(false);
conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java
index 556923ee33..1365872e8b 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java
@@ -180,7 +180,8 @@ public class TestReconAsPassiveScm {
.filter(p -> !p.getId().equals(containerInfo.getPipelineID()))
.findFirst();
assertTrue(pipelineToClose.isPresent());
- scmPipelineManager.closePipeline(pipelineToClose.get(), false);
+ scmPipelineManager.closePipeline(pipelineToClose.get().getId());
+ scmPipelineManager.deletePipeline(pipelineToClose.get().getId());
// Start Recon
cluster.startRecon();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
index 073e0e3bbd..2df0d09db5 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
@@ -39,6 +39,7 @@ import
org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneClusterProvider;
import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.TestDataUtil;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
@@ -134,6 +135,9 @@ public class TestDecommissionAndMaintenance {
1, SECONDS);
conf.setTimeDuration(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
0, SECONDS);
+ conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s");
ReplicationManagerConfiguration replicationConf =
conf.getObject(ReplicationManagerConfiguration.class);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java
index 52bd98de35..53f4ce5e16 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java
@@ -86,10 +86,12 @@ public class TestSCMPipelineMetrics {
Optional<Pipeline> pipeline = pipelineManager
.getPipelines().stream().findFirst();
Assertions.assertTrue(pipeline.isPresent());
- Assertions.assertDoesNotThrow(() ->
- cluster.getStorageContainerManager()
- .getPipelineManager()
- .closePipeline(pipeline.get(), false));
+ Assertions.assertDoesNotThrow(() -> {
+ PipelineManager pm = cluster.getStorageContainerManager()
+ .getPipelineManager();
+ pm.closePipeline(pipeline.get().getId());
+ pm.deletePipeline(pipeline.get().getId());
+ });
MetricsRecordBuilder metrics = getMetrics(
SCMPipelineMetrics.class.getSimpleName());
assertCounter("NumPipelineDestroyed", 1L, metrics);
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
index 0cde1c687a..fab30cf20b 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
@@ -150,7 +150,8 @@ public final class ReconPipelineManager extends
PipelineManagerImpl {
}
try {
LOG.info("Removing invalid pipeline {} from Recon.", pipelineID);
- closePipeline(p, false);
+ closePipeline(p.getId());
+ deletePipeline(p.getId());
} catch (IOException e) {
LOG.warn("Unable to remove pipeline {}", pipelineID, e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]