This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-14496-zdu
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-14496-zdu by this push:
new bb546a28374 HDDS-14646. SCM should not close Ratis pipelines on
Finalize (#9779)
bb546a28374 is described below
commit bb546a28374c1b5e06feebb9300b8c734b79bb4e
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Tue Feb 24 21:39:39 2026 +0000
HDDS-14646. SCM should not close Ratis pipelines on Finalize (#9779)
---
.../upgrade/DataNodeUpgradeFinalizer.java | 41 ------
.../apache/hadoop/hdds/scm/node/NodeManager.java | 4 +-
.../hadoop/hdds/scm/node/SCMNodeManager.java | 6 -
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 12 --
.../hdds/scm/pipeline/PipelineManagerImpl.java | 39 +-----
.../scm/safemode/HealthyPipelineSafeModeRule.java | 17 +--
.../hdds/scm/safemode/SafeModeRuleFactory.java | 2 +-
.../scm/server/upgrade/FinalizationManager.java | 7 -
.../server/upgrade/FinalizationManagerImpl.java | 5 -
.../upgrade/FinalizationStateManagerImpl.java | 22 ---
.../scm/server/upgrade/SCMUpgradeFinalizer.java | 152 ++++++++++-----------
.../hdds/scm/pipeline/MockPipelineManager.java | 15 --
.../hdds/scm/upgrade/TestScmFinalization.java | 30 ----
.../hadoop/hdds/upgrade/TestHddsUpgradeUtils.java | 52 +------
.../hadoop/hdds/upgrade/TestScmHAFinalization.java | 25 +---
15 files changed, 85 insertions(+), 344 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
index 5cb06e88e6c..769d556643c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
@@ -17,15 +17,8 @@
package org.apache.hadoop.ozone.container.upgrade;
-import static
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.PREFINALIZE_VALIDATION_FAILED;
-import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalization.Status.FINALIZATION_IN_PROGRESS;
-import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalization.Status.FINALIZATION_REQUIRED;
-
-import java.io.IOException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
import org.apache.hadoop.ozone.upgrade.LayoutFeature;
@@ -41,40 +34,6 @@ public DataNodeUpgradeFinalizer(HDDSLayoutVersionManager
versionManager) {
super(versionManager);
}
- @Override
- public void preFinalizeUpgrade(DatanodeStateMachine dsm)
- throws IOException {
- if (!canFinalizeDataNode(dsm)) {
- // DataNode is not yet ready to finalize.
- // Reset the Finalization state.
- getVersionManager().setUpgradeState(FINALIZATION_REQUIRED);
- String msg = "Pre Finalization checks failed on the DataNode.";
- logAndEmit(msg);
- throw new UpgradeException(msg, PREFINALIZE_VALIDATION_FAILED);
- }
- getVersionManager().setUpgradeState(FINALIZATION_IN_PROGRESS);
- }
-
- private boolean canFinalizeDataNode(DatanodeStateMachine dsm) {
- // Lets be sure that we do not have any open container before we return
- // from here. This function should be called in its own finalizer thread
- // context.
- for (Container<?> ctr :
- dsm.getContainer().getController().getContainers()) {
- ContainerProtos.ContainerDataProto.State state = ctr.getContainerState();
- long id = ctr.getContainerData().getContainerID();
- switch (state) {
- case OPEN:
- case CLOSING:
- LOG.warn("FinalizeUpgrade : Waiting for container {} to close, current
"
- + "state is: {}", id, state);
- return false;
- default:
- }
- }
- return true;
- }
-
@Override
public void finalizeLayoutFeature(LayoutFeature layoutFeature,
DatanodeStateMachine dsm) throws UpgradeException {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index e9a019945c1..e1cf9a45bf5 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -408,9 +408,7 @@ default Collection<DatanodeDetails>
getPeerList(DatanodeDetails dn) {
default HDDSLayoutVersionManager getLayoutVersionManager() {
return null;
}
-
- default void forceNodesToHealthyReadOnly() { }
-
+
/**
* This API allows removal of only DECOMMISSIONED, IN_MAINTENANCE and DEAD
nodes
* from NodeManager data structures and cleanup memory.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 3289e7b312a..ff561411f34 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -1958,12 +1958,6 @@ public HDDSLayoutVersionManager
getLayoutVersionManager() {
return scmLayoutVersionManager;
}
- @VisibleForTesting
- @Override
- public void forceNodesToHealthyReadOnly() {
- nodeStateManager.forceNodesToHealthyReadOnly();
- }
-
private ReentrantReadWriteLock.WriteLock writeLock() {
return lock.writeLock();
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 6a448d6c88d..4af0d60634a 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -180,18 +180,6 @@ default Pipeline
waitOnePipelineReady(Collection<PipelineID> pipelineIDs,
void reinitialize(Table<PipelineID, Pipeline> pipelineStore)
throws RocksDatabaseException, DuplicatedPipelineIdException,
CodecException;
- /**
- * Ask pipeline manager to not create any new pipelines.
- */
- void freezePipelineCreation();
-
- /**
- * Ask pipeline manager to resume creating new pipelines.
- */
- void resumePipelineCreation();
-
- boolean isPipelineCreationFrozen();
-
/**
* Acquire read lock.
*/
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 9c529e22e7e..bf33282aaf4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -31,7 +31,6 @@
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.management.ObjectName;
@@ -56,7 +55,6 @@
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.db.CodecException;
import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
@@ -93,9 +91,6 @@ public class PipelineManagerImpl implements PipelineManager {
private final SCMHAManager scmhaManager;
private SCMContext scmContext;
private final NodeManager nodeManager;
- // This allows for freezing/resuming the new pipeline creation while the
- // SCM is already out of SafeMode.
- private AtomicBoolean freezePipelineCreation;
private final Clock clock;
@SuppressWarnings("checkstyle:parameterNumber")
@@ -123,7 +118,6 @@ protected PipelineManagerImpl(ConfigurationSource conf,
HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
- this.freezePipelineCreation = new AtomicBoolean();
}
@SuppressWarnings("checkstyle:parameterNumber")
@@ -160,13 +154,7 @@ public static PipelineManagerImpl newPipelineManager(
pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator);
serviceManager.register(backgroundPipelineCreator);
-
- if (FinalizationManager.shouldCreateNewPipelines(
- scmContext.getFinalizationCheckpoint())) {
- pipelineManager.resumePipelineCreation();
- } else {
- pipelineManager.freezePipelineCreation();
- }
+ backgroundPipelineCreator.start();
final long scrubberIntervalInMillis = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL,
@@ -276,13 +264,6 @@ private void checkIfPipelineCreationIsAllowed(
throw new IOException("Pipeline creation is not allowed as safe mode " +
"prechecks have not yet passed");
}
-
- if (freezePipelineCreation.get()) {
- String message = "Cannot create new pipelines while pipeline creation " +
- "is frozen.";
- LOG.info(message);
- throw new IOException(message);
- }
}
private void addPipelineToManager(Pipeline pipeline)
@@ -803,24 +784,6 @@ public void reinitialize(Table<PipelineID, Pipeline>
pipelineStore)
stateManager.reinitialize(pipelineStore);
}
- @Override
- public void freezePipelineCreation() {
- freezePipelineCreation.set(true);
- backgroundPipelineCreator.stop();
- }
-
- @Override
- public void resumePipelineCreation() {
- freezePipelineCreation.set(false);
- backgroundPipelineCreator.start();
- }
-
- @Override
- public boolean isPipelineCreationFrozen() {
- return freezePipelineCreation.get() &&
- !backgroundPipelineCreator.isRunning();
- }
-
@Override
public void close() throws IOException {
if (backgroundPipelineCreator != null) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 94964df73a9..3ad70fefa04 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -32,14 +32,12 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.slf4j.Logger;
@@ -63,16 +61,14 @@ public class HealthyPipelineSafeModeRule extends
SafeModeExitRule<Pipeline> {
private final Set<PipelineID> processedPipelineIDs = new HashSet<>();
private final PipelineManager pipelineManager;
private final int minHealthyPipelines;
- private final SCMContext scmContext;
private final Set<PipelineID> unProcessedPipelineSet = new HashSet<>();
private final NodeManager nodeManager;
HealthyPipelineSafeModeRule(EventQueue eventQueue,
PipelineManager pipelineManager, SCMSafeModeManager manager,
- ConfigurationSource configuration, SCMContext scmContext, NodeManager
nodeManager) {
+ ConfigurationSource configuration, NodeManager nodeManager) {
super(manager, eventQueue);
this.pipelineManager = pipelineManager;
- this.scmContext = scmContext;
this.nodeManager = nodeManager;
healthyPipelinesPercent =
configuration.getDouble(HddsConfigKeys.
@@ -116,16 +112,7 @@ protected TypedEvent<Pipeline> getEventType() {
@Override
protected synchronized boolean validate() {
- boolean shouldRunSafemodeCheck =
- FinalizationManager.shouldCreateNewPipelines(
- scmContext.getFinalizationCheckpoint());
- if (!shouldRunSafemodeCheck) {
- LOG.info("All SCM pipelines are closed due to ongoing upgrade " +
- "finalization. Bypassing healthy pipeline safemode rule.");
- return true;
- } else {
- return currentHealthyPipelineCount >= healthyPipelineThresholdCount;
- }
+ return currentHealthyPipelineCount >= healthyPipelineThresholdCount;
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeRuleFactory.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeRuleFactory.java
index 398eb19b56e..52dfbd17e9e 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeRuleFactory.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeRuleFactory.java
@@ -94,7 +94,7 @@ private void loadRules(SCMSafeModeManager safeModeManager) {
if (pipelineManager != null) {
safeModeRules.add(new HealthyPipelineSafeModeRule(eventQueue,
pipelineManager,
- safeModeManager, config, scmContext, nodeManager));
+ safeModeManager, config, nodeManager));
safeModeRules.add(new OneReplicaPipelineSafeModeRule(eventQueue,
pipelineManager,
safeModeManager, config));
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManager.java
index 114063c5bce..421837971bb 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManager.java
@@ -43,8 +43,6 @@ UpgradeFinalization.StatusAndMessages
queryUpgradeFinalizationProgress(
BasicUpgradeFinalizer<SCMUpgradeFinalizationContext,
HDDSLayoutVersionManager>
getUpgradeFinalizer();
- boolean crossedCheckpoint(FinalizationCheckpoint checkpoint);
-
FinalizationCheckpoint getCheckpoint();
void buildUpgradeContext(NodeManager nodeManager,
@@ -55,11 +53,6 @@ void buildUpgradeContext(NodeManager nodeManager,
void onLeaderReady();
- static boolean shouldCreateNewPipelines(FinalizationCheckpoint checkpoint) {
- return !checkpoint.hasCrossed(FinalizationCheckpoint.FINALIZATION_STARTED)
- || checkpoint.hasCrossed(FinalizationCheckpoint.MLV_EQUALS_SLV);
- }
-
static boolean shouldTellDatanodesToFinalize(
FinalizationCheckpoint checkpoint) {
return checkpoint.hasCrossed(FinalizationCheckpoint.MLV_EQUALS_SLV);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManagerImpl.java
index 5155aea425b..be65b933a1b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManagerImpl.java
@@ -132,11 +132,6 @@ HDDSLayoutVersionManager> getUpgradeFinalizer() {
return upgradeFinalizer;
}
- @Override
- public boolean crossedCheckpoint(FinalizationCheckpoint checkpoint) {
- return finalizationStateManager.crossedCheckpoint(checkpoint);
- }
-
@Override
public FinalizationCheckpoint getCheckpoint() {
return finalizationStateManager.getFinalizationCheckpoint();
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManagerImpl.java
index 77bc3499d9a..4dde4089f0a 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManagerImpl.java
@@ -25,7 +25,6 @@
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
import org.apache.hadoop.hdds.scm.metadata.Replicate;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.hdds.utils.db.Table;
@@ -72,27 +71,6 @@ private void publishCheckpoint(FinalizationCheckpoint
checkpoint) {
// Move the upgrade status according to this checkpoint. This is sent
// back to the client if they query for the current upgrade status.
versionManager.setUpgradeState(checkpoint.getStatus());
-
- // Check whether this checkpoint change requires us to move node state.
- // If this is necessary, it must be done before unfreezing pipeline
- // creation to make sure nodes are not added to pipelines based on
- // outdated layout information.
- // This operation is not idempotent.
- if (checkpoint == FinalizationCheckpoint.MLV_EQUALS_SLV) {
- upgradeContext.getNodeManager().forceNodesToHealthyReadOnly();
- }
-
- // Check whether this checkpoint change requires us to freeze pipeline
- // creation. These are idempotent operations.
- PipelineManager pipelineManager = upgradeContext.getPipelineManager();
- if (FinalizationManager.shouldCreateNewPipelines(checkpoint) &&
- pipelineManager.isPipelineCreationFrozen()) {
- pipelineManager.resumePipelineCreation();
- } else if (!FinalizationManager.shouldCreateNewPipelines(checkpoint) &&
- !pipelineManager.isPipelineCreationFrozen()) {
- pipelineManager.freezePipelineCreation();
- }
-
// Set the checkpoint in the SCM context so other components can read it.
upgradeContext.getSCMContext().setFinalizationCheckpoint(checkpoint);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
index 6fb479592ad..553d1d09b19 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
@@ -17,14 +17,13 @@
package org.apache.hadoop.hdds.scm.server.upgrade;
-import static
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
-
import java.io.IOException;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
@@ -67,16 +66,11 @@ public void
preFinalizeUpgrade(SCMUpgradeFinalizationContext context)
context.getFinalizationStateManager().addFinalizingMark();
}
logCheckpointCrossed(FinalizationCheckpoint.FINALIZATION_STARTED);
-
- if (!stateManager.crossedCheckpoint(
- FinalizationCheckpoint.MLV_EQUALS_SLV)) {
- closePipelinesBeforeFinalization(context.getPipelineManager());
- }
}
@Override
public void finalizeLayoutFeature(LayoutFeature lf,
- SCMUpgradeFinalizationContext context) throws UpgradeException {
+ SCMUpgradeFinalizationContext context) throws UpgradeException {
// Run upgrade actions, update VERSION file, and update layout version in
// DB.
try {
@@ -115,89 +109,89 @@ public void
postFinalizeUpgrade(SCMUpgradeFinalizationContext context)
context.getFinalizationStateManager();
if (!stateManager.crossedCheckpoint(
FinalizationCheckpoint.FINALIZATION_COMPLETE)) {
- createPipelinesAfterFinalization(context);
+ waitForDatanodesToFinalize(context);
stateManager.removeFinalizingMark();
}
}
- private void closePipelinesBeforeFinalization(PipelineManager
pipelineManager)
- throws IOException {
- /*
- * Before we can call finalize the feature, we need to make sure that
- * all existing pipelines are closed and pipeline Manger would freeze
- * all new pipeline creation.
- */
- String msg = " Existing pipelines and containers will be closed " +
- "during Upgrade.";
- msg += "\n New pipelines creation will remain frozen until Upgrade " +
- "is finalized.";
-
- // Pipeline creation should already be frozen when the finalization state
- // manager set the checkpoint.
- if (!pipelineManager.isPipelineCreationFrozen()) {
- throw new SCMException("Error during finalization. Pipeline creation" +
- "should have been frozen before closing existing pipelines.",
- SCMException.ResultCodes.INTERNAL_ERROR);
- }
-
- for (Pipeline pipeline : pipelineManager.getPipelines()) {
- if (pipeline.getPipelineState() != CLOSED) {
- pipelineManager.closePipeline(pipeline.getId());
- }
- }
-
- // We can not yet move all the existing data nodes to HEALTHY-READONLY
- // state since the next heartbeat will move them back to HEALTHY state.
- // This has to wait till postFinalizeUpgrade, when SCM MLV version is
- // already upgraded as part of finalize processing.
- // While in this state, it should be safe to do finalize processing for
- // all new features. This will also update ondisk mlv version. Any
- // disrupting upgrade can add a hook here to make sure that SCM is in a
- // consistent state while finalizing the upgrade.
-
- logAndEmit(msg);
- }
+ /**
+ * Wait for all HEALTHY datanodes to complete finalization before finishing
+ * SCM finalization. This ensures that when the client receives a
+ * FINALIZATION_DONE status, all healthy datanodes have also finalized.
+ *
+ * A datanode is considered finalized when its metadata layout version (MLV)
+ * equals its software layout version (SLV), indicating it has completed
+ * processing all layout features.
+ *
+ * @param context The finalization context containing node manager reference
+ * @throws SCMException if waiting is interrupted or SCM loses leadership
+ * @throws NotLeaderException if SCM is no longer the leader
+ */
+ private void waitForDatanodesToFinalize(SCMUpgradeFinalizationContext
context)
+ throws SCMException, NotLeaderException {
+ NodeManager nodeManager = context.getNodeManager();
- private void createPipelinesAfterFinalization(
- SCMUpgradeFinalizationContext context) throws SCMException,
- NotLeaderException {
- // Pipeline creation should already be resumed when the finalization state
- // manager set the checkpoint.
- PipelineManager pipelineManager = context.getPipelineManager();
- if (pipelineManager.isPipelineCreationFrozen()) {
- throw new SCMException("Error during finalization. Pipeline creation " +
- "should have been resumed before waiting for new pipelines.",
- SCMException.ResultCodes.INTERNAL_ERROR);
- }
+ LOG.info("Waiting for all HEALTHY datanodes to complete finalization
before finishing SCM finalization.");
- // Wait for at least one pipeline to be created before finishing
- // finalization, so clients can write.
- boolean hasPipeline = false;
- while (!hasPipeline) {
+ boolean allDatanodesFinalized = false;
+ while (!allDatanodesFinalized) {
// Break out of the wait and step down from driving finalization if this
// SCM is no longer the leader by throwing NotLeaderException.
context.getSCMContext().getTermOfLeader();
- ReplicationConfig ratisThree =
- ReplicationConfig.fromProtoTypeAndFactor(
- HddsProtos.ReplicationType.RATIS,
- HddsProtos.ReplicationFactor.THREE);
- int pipelineCount =
- pipelineManager.getPipelines(ratisThree, Pipeline.PipelineState.OPEN)
- .size();
-
- hasPipeline = (pipelineCount >= 1);
- if (!hasPipeline) {
- LOG.info("Waiting for at least one open Ratis 3 pipeline after SCM " +
- "finalization.");
+ allDatanodesFinalized = true;
+ int totalHealthyNodes = 0;
+ int finalizedNodes = 0;
+ int unfinalizedNodes = 0;
+
+ for (DatanodeDetails dn : nodeManager.getAllNodes()) {
+ try {
+ // Only check HEALTHY nodes. STALE/DEAD nodes will be told to
+ // finalize when they recover.
+ if (nodeManager.getNodeStatus(dn).isHealthy()) {
+ totalHealthyNodes++;
+ DatanodeInfo datanodeInfo = nodeManager.getDatanodeInfo(dn);
+ if (datanodeInfo == null) {
+ LOG.warn("Could not get DatanodeInfo for {}, skipping in " +
+ "finalization wait.", dn.getHostName());
+ continue;
+ }
+
+ LayoutVersionProto dnLayout =
datanodeInfo.getLastKnownLayoutVersion();
+ int dnMlv = dnLayout.getMetadataLayoutVersion();
+ int dnSlv = dnLayout.getSoftwareLayoutVersion();
+
+ if (dnMlv < dnSlv) {
+ // Datanode has not yet finalized
+ allDatanodesFinalized = false;
+ unfinalizedNodes++;
+ LOG.debug("Datanode {} has not yet finalized: MLV={}, SLV={}",
+ dn.getHostName(), dnMlv, dnSlv);
+ } else {
+ finalizedNodes++;
+ }
+ }
+ } catch (NodeNotFoundException e) {
+ // Node was removed while we were iterating. This is OK, skip it.
+ LOG.debug("Node {} not found while waiting for finalization, " +
+ "skipping.", dn);
+ }
+ }
+
+ if (!allDatanodesFinalized) {
+ LOG.info("Waiting for datanodes to finalize. Status: {}/{} healthy " +
+ "datanodes have finalized ({} remaining).",
+ finalizedNodes, totalHealthyNodes, unfinalizedNodes);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
- // Try again on next loop iteration.
Thread.currentThread().interrupt();
+ throw new SCMException("Interrupted while waiting for datanodes to "
+
+ "finalize.", SCMException.ResultCodes.INTERNAL_ERROR);
}
} else {
- LOG.info("Open pipeline found after SCM finalization");
+ LOG.info("All {} HEALTHY datanodes have completed finalization.",
+ totalHealthyNodes);
}
}
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index d6a3fc54635..dbe551fcda7 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -287,16 +287,6 @@ public boolean getSafeModeStatus() {
public void reinitialize(Table<PipelineID, Pipeline> pipelineStore) {
}
- @Override
- public void freezePipelineCreation() {
-
- }
-
- @Override
- public void resumePipelineCreation() {
-
- }
-
@Override
public void close() {
}
@@ -326,11 +316,6 @@ public void releaseWriteLock() {
}
- @Override
- public boolean isPipelineCreationFrozen() {
- return false;
- }
-
@Override
public boolean hasEnoughSpace(Pipeline pipeline, long containerSize) {
return false;
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestScmFinalization.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestScmFinalization.java
index 67e661bc9d0..c8dce937e41 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestScmFinalization.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestScmFinalization.java
@@ -21,14 +21,12 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.matches;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
@@ -68,10 +66,6 @@ public class TestScmFinalization {
private static final Logger LOG =
LoggerFactory.getLogger(TestScmFinalization.class);
- // Indicates the current state of the mock pipeline manager's pipeline
- // creation.
- private boolean pipelineCreationFrozen = false;
-
/**
* Order of finalization checkpoints within the enum is used to determine
* which ones have been passed. If ordering within the enum is changed
@@ -253,9 +247,6 @@ public void testResumeFinalizationFromCheckpoint(
matches(OzoneConsts.FINALIZING_KEY),
matches(""));
- // Next, all pipeline creation should be stopped.
- inOrder.verify(pipelineManager, count).freezePipelineCreation();
-
if (initialCheckpoint == FinalizationCheckpoint.FINALIZATION_STARTED) {
count = times(1);
}
@@ -267,20 +258,12 @@ public void testResumeFinalizationFromCheckpoint(
inOrder.verify(storage, count)
.setLayoutVersion(feature.layoutVersion());
inOrder.verify(storage, count).persistCurrentState();
- // After MLV == SLV, all datanodes should be moved to healthy readonly.
- if (feature.layoutVersion() ==
- HDDSLayoutVersionManager.maxLayoutVersion()) {
- inOrder.verify(nodeManager, count).forceNodesToHealthyReadOnly();
- }
inOrder.verify(buffer, count).addToBuffer(
eq(finalizationStore),
matches(OzoneConsts.LAYOUT_VERSION_KEY),
eq(String.valueOf(feature.layoutVersion())));
}
}
- // If this was not called in the loop, there was an error. To detect this
- // mistake, verify again here.
- verify(nodeManager, count).forceNodesToHealthyReadOnly();
if (initialCheckpoint == FinalizationCheckpoint.MLV_EQUALS_SLV) {
count = times(1);
@@ -351,19 +334,6 @@ private PipelineManager getMockPipelineManager(
when(pipelineManager.getPipelines(any(),
any())).thenReturn(Arrays.asList(null, null, null));
- // Set the initial value for pipeline creation based on the checkpoint.
- // In a real cluster, this would be set on startup of the
- // PipelineManagerImpl.
- pipelineCreationFrozen =
- !FinalizationManager.shouldCreateNewPipelines(inititalCheckpoint);
- doAnswer(args -> pipelineCreationFrozen = true)
- .when(pipelineManager).freezePipelineCreation();
- doAnswer(args -> pipelineCreationFrozen = false)
- .when(pipelineManager).resumePipelineCreation();
-
- doAnswer(args -> pipelineCreationFrozen)
- .when(pipelineManager).isPipelineCreationFrozen();
-
return pipelineManager;
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
index aa7c78b2e5a..d9ea4f57f89 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
@@ -17,11 +17,8 @@
package org.apache.hadoop.hdds.upgrade;
-import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
-import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY;
-import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalization.Status.ALREADY_FINALIZED;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalization.Status.FINALIZATION_DONE;
import static org.assertj.core.api.Assertions.assertThat;
@@ -31,16 +28,13 @@
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
-import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationCheckpoint;
@@ -60,10 +54,6 @@ public final class TestHddsUpgradeUtils {
private static final Logger LOG =
LoggerFactory.getLogger(TestHddsUpgradeUtils.class);
- private static final ReplicationConfig RATIS_THREE =
-
ReplicationConfig.fromProtoTypeAndFactor(HddsProtos.ReplicationType.RATIS,
- HddsProtos.ReplicationFactor.THREE);
-
private TestHddsUpgradeUtils() { }
public static void waitForFinalizationFromClient(
@@ -117,32 +107,13 @@ public static void
testPostUpgradeConditionsSCM(StorageContainerManager scm,
scmVersionManager.getMetadataLayoutVersion());
assertThat(scmVersionManager.getMetadataLayoutVersion()).isGreaterThanOrEqualTo(1);
- // SCM should not return from finalization until there is at least one
- // pipeline to use.
- PipelineManager scmPipelineManager = scm.getPipelineManager();
- try {
- GenericTestUtils.waitFor(
- () -> !scmPipelineManager.getPipelines(RATIS_THREE, OPEN).isEmpty(),
- 500, 60000);
- } catch (TimeoutException | InterruptedException e) {
- fail("Timeout waiting for Upgrade to complete on SCM.");
- }
-
- // SCM will not return from finalization until there is at least one
- // RATIS 3 pipeline. For this to exist, all three of our datanodes must
- // be in the HEALTHY state.
+ // SCM will not return from finalization until all HEALTHY datanodes
+ // have completed their finalization (MLV == SLV). This ensures datanodes
+ // are ready to serve requests even though containers may remain OPEN.
testDataNodesStateOnSCM(scm, numDatanodes, HEALTHY, HEALTHY_READONLY);
int countContainers = 0;
- for (ContainerInfo ci : scm.getContainerManager().getContainers()) {
- HddsProtos.LifeCycleState ciState = ci.getState();
- LOG.info("testPostUpgradeConditionsSCM: container state is {}",
- ciState.name());
- assertTrue((ciState == HddsProtos.LifeCycleState.CLOSED) ||
- (ciState == HddsProtos.LifeCycleState.CLOSING) ||
- (ciState == HddsProtos.LifeCycleState.DELETING) ||
- (ciState == HddsProtos.LifeCycleState.DELETED) ||
- (ciState == HddsProtos.LifeCycleState.QUASI_CLOSED));
+ for (ContainerInfo ignored : scm.getContainerManager().getContainers()) {
countContainers++;
}
assertThat(countContainers).isGreaterThanOrEqualTo(numContainers);
@@ -180,14 +151,6 @@ public static void testPreUpgradeConditionsDataNodes(
public static void testPostUpgradeConditionsDataNodes(
List<HddsDatanodeService> datanodes, int numContainers,
ContainerProtos.ContainerDataProto.State... validClosedContainerStates) {
- List<ContainerProtos.ContainerDataProto.State> closeStates =
- Arrays.asList(validClosedContainerStates);
- // Allow closed and quasi closed containers as valid closed containers by
- // default.
- if (closeStates.isEmpty()) {
- closeStates = Arrays.asList(CLOSED, QUASI_CLOSED);
- }
-
try {
GenericTestUtils.waitFor(() -> {
for (HddsDatanodeService dataNode : datanodes) {
@@ -217,12 +180,9 @@ public static void testPostUpgradeConditionsDataNodes(
dnVersionManager.getMetadataLayoutVersion());
assertThat(dnVersionManager.getMetadataLayoutVersion()).isGreaterThanOrEqualTo(1);
- // Also verify that all the existing containers are closed.
- for (Container<?> container :
+ // Verify containers are in acceptable states (OPEN is now allowed).
+ for (Container<?> ignored :
dsm.getContainer().getController().getContainers()) {
- assertTrue(closeStates.stream().anyMatch(
- state -> container.getContainerState().equals(state)),
- "Container had unexpected state " + container.getContainerState());
countContainers++;
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
index e4960cce160..da3cb82e68d 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
@@ -88,6 +88,7 @@ public void init(OzoneConfiguration conf,
conf.setInt(SCMStorageConfig.TESTING_INIT_LAYOUT_VERSION_KEY,
HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
conf.set(ScmConfigKeys.OZONE_SCM_HA_RATIS_SERVER_RPC_FIRST_ELECTION_TIMEOUT,
"5s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT, "1s");
MiniOzoneHAClusterImpl.Builder clusterBuilder =
MiniOzoneCluster.newHABuilder(conf);
clusterBuilder.setNumOfStorageContainerManagers(NUM_SCMS)
@@ -330,45 +331,21 @@ private void checkMidFinalizationConditions(
// At least one node (leader) should be in the FINALIZATION_REQUIRED
stage.
assertTrue(scms.stream().anyMatch(scm ->
scm.getScmContext().getFinalizationCheckpoint() ==
FinalizationCheckpoint.FINALIZATION_REQUIRED));
- // Pipeline creation should not be frozen at this point, even on leader.
- assertTrue(scms.stream().noneMatch(scm ->
- scm.getPipelineManager().isPipelineCreationFrozen()));
break;
case AFTER_PRE_FINALIZE_UPGRADE:
// At least one node (leader) should be in the FINALIZATION_STARTED
stage.
assertTrue(scms.stream().anyMatch(scm ->
scm.getScmContext().getFinalizationCheckpoint() ==
FinalizationCheckpoint.FINALIZATION_STARTED));
- // Pipeline creation should be frozen on nodes where the finalization
checkpoint is FINALIZATION_STARTED,
- // this should include the leader SCM.
- assertTrue(scms.stream()
- .filter(scm ->
- scm.getScmContext().getFinalizationCheckpoint() ==
FinalizationCheckpoint.FINALIZATION_STARTED)
- .allMatch(scm ->
- scm.getPipelineManager().isPipelineCreationFrozen()));
break;
case AFTER_COMPLETE_FINALIZATION:
// At least one node (leader) should be in the MLV_EQUALS_SLV stage.
assertTrue(scms.stream().anyMatch(scm ->
scm.getScmContext().getFinalizationCheckpoint() ==
FinalizationCheckpoint.MLV_EQUALS_SLV));
- // Pipeline creation should not be frozen on nodes where the
finalization checkpoint is MLV_EQUALS_SLV,
- // this should include the leader SCM.
- assertTrue(scms.stream()
- .filter(scm ->
- scm.getScmContext().getFinalizationCheckpoint() ==
FinalizationCheckpoint.MLV_EQUALS_SLV)
- .noneMatch(scm ->
- scm.getPipelineManager().isPipelineCreationFrozen()));
break;
case AFTER_POST_FINALIZE_UPGRADE:
// At least one node (leader) should be in the FINALIZATION_COMPLETE
stage.
assertTrue(scms.stream().anyMatch(scm ->
scm.getScmContext().getFinalizationCheckpoint() ==
FinalizationCheckpoint.FINALIZATION_COMPLETE));
- // Pipeline creation should not be frozen on nodes where the
finalization checkpoint is FINALIZATION_COMPLETE,
- // this should include the leader SCM.
- assertTrue(scms.stream()
- .filter(scm ->
- scm.getScmContext().getFinalizationCheckpoint() ==
FinalizationCheckpoint.FINALIZATION_COMPLETE)
- .noneMatch(scm ->
- scm.getPipelineManager().isPipelineCreationFrozen()));
break;
default:
fail("Unknown halting point in test: " + haltingPoint);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]