This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f5d2a0f548a HDDS-13590. Refactor HealthyPipelineSafeModeRule to not 
use PipelineReportFromDatanode (#9651)
f5d2a0f548a is described below

commit f5d2a0f548a39ddccbd606c3039f88d8b02e8565
Author: Priyesh Karatha <[email protected]>
AuthorDate: Tue Mar 3 22:38:49 2026 +0530

    HDDS-13590. Refactor HealthyPipelineSafeModeRule to not use 
PipelineReportFromDatanode (#9651)
---
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |  82 ++++++++++++++-
 .../hadoop/hdds/scm/safemode/SafeModeMetrics.java  |   8 +-
 .../safemode/TestHealthyPipelineSafeModeRule.java  | 113 +++++++++++++++++++++
 3 files changed, 199 insertions(+), 4 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 94964df73a9..3e590013c11 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -123,13 +123,18 @@ protected synchronized boolean validate() {
       LOG.info("All SCM pipelines are closed due to ongoing upgrade " +
           "finalization. Bypassing healthy pipeline safemode rule.");
       return true;
-    } else {
-      return currentHealthyPipelineCount >= healthyPipelineThresholdCount;
     }
+    if (!validateBasedOnReportProcessing()) {
+      return validateHealthyPipelineSafeModeRuleUsingPipelineManager();
+    }
+    return currentHealthyPipelineCount >= healthyPipelineThresholdCount;
   }
 
   @Override
   protected synchronized void process(Pipeline pipeline) {
+    if (!validateBasedOnReportProcessing()) {
+      return;
+    }
     Objects.requireNonNull(pipeline, "pipeline == null");
 
     // When SCM is in safe mode for long time, already registered
@@ -237,6 +242,61 @@ private synchronized void initializeRule(boolean refresh) {
         healthyPipelineThresholdCount);
   }
 
+  private boolean validateHealthyPipelineSafeModeRuleUsingPipelineManager() {
+    // Query PipelineManager directly for healthy pipeline count
+    List<Pipeline> openPipelines = pipelineManager.getPipelines(
+        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
+        Pipeline.PipelineState.OPEN);
+    
+    LOG.debug("Found {} open RATIS/THREE pipelines", openPipelines.size());
+
+    int pipelineCount = openPipelines.size();
+    healthyPipelineThresholdCount = Math.max(minHealthyPipelines,
+        (int) Math.ceil(healthyPipelinesPercent * pipelineCount));
+
+    currentHealthyPipelineCount = (int) openPipelines.stream()
+        .filter(this::isPipelineHealthy)
+        .count();
+
+    
getSafeModeMetrics().setNumCurrentHealthyPipelines(currentHealthyPipelineCount);
+    boolean isValid = currentHealthyPipelineCount >= 
healthyPipelineThresholdCount;
+    if (scmInSafeMode()) {
+      LOG.info("SCM in safe mode. Healthy pipelines: {}, threshold: {}, rule 
satisfied: {}",
+          currentHealthyPipelineCount, healthyPipelineThresholdCount, isValid);
+    } else {
+      LOG.debug("SCM not in safe mode. Healthy pipelines: {}, threshold: {}",
+          currentHealthyPipelineCount, healthyPipelineThresholdCount);
+    }
+    return isValid;
+  }
+
+  boolean isPipelineHealthy(Pipeline pipeline) {
+    // Verify pipeline has all 3 nodes
+    List<DatanodeDetails> nodes = pipeline.getNodes();
+    if (nodes.size() != 3) {
+      LOG.debug("Pipeline {} is not healthy: has {} nodes instead of 3",
+          pipeline.getId(), nodes.size());
+      return false;
+    }
+
+    // Verify all nodes are healthy
+    for (DatanodeDetails dn : nodes) {
+      try {
+        NodeStatus status = nodeManager.getNodeStatus(dn);
+        if (!status.equals(NodeStatus.inServiceHealthy())) {
+          LOG.debug("Pipeline {} is not healthy: DN {} has status - Health: 
{}, Operational State: {}",
+              pipeline.getId(), dn.getUuidString(), status.getHealth(), 
status.getOperationalState());
+          return false;
+        }
+      } catch (NodeNotFoundException e) {
+        LOG.warn("Pipeline {} is not healthy: DN {} not found in node manager",
+            pipeline.getId(), dn.getUuidString());
+        return false;
+      }
+    }
+    return true;
+  }
+
   @Override
   protected synchronized void cleanup() {
     processedPipelineIDs.clear();
@@ -265,6 +325,24 @@ public String getStatusText() {
 
   private synchronized String updateStatusTextWithSamplePipelines(
       String status) {
+    if (validateBasedOnReportProcessing()) {
+      List<Pipeline> openPipelines = pipelineManager.getPipelines(
+          
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
+          Pipeline.PipelineState.OPEN);
+
+      Set<PipelineID> unhealthyPipelines = openPipelines.stream()
+          .filter(p -> !isPipelineHealthy(p))
+          .map(Pipeline::getId)
+          .limit(SAMPLE_PIPELINE_DISPLAY_LIMIT)
+          .collect(Collectors.toSet());
+
+      if (!unhealthyPipelines.isEmpty()) {
+        String samplePipelineText =
+            "Sample pipelines not satisfying the criteria : " + 
unhealthyPipelines;
+        status = status.concat("\n").concat(samplePipelineText);
+      }
+      return status;
+    }
     Set<PipelineID> samplePipelines =
         unProcessedPipelineSet.stream().limit(SAMPLE_PIPELINE_DISPLAY_LIMIT)
             .collect(Collectors.toSet());
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeMetrics.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeMetrics.java
index ae65eafcb91..1f1daaae09b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeMetrics.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeMetrics.java
@@ -47,7 +47,7 @@ public class SafeModeMetrics {
 
   // Pipeline metrics for safemode
   private @Metric MutableGaugeLong numHealthyPipelinesThreshold;
-  private @Metric MutableCounterLong currentHealthyPipelinesCount;
+  private @Metric MutableGaugeLong currentHealthyPipelinesCount;
   private @Metric MutableGaugeLong
       numPipelinesWithAtleastOneReplicaReportedThreshold;
   private @Metric MutableCounterLong
@@ -72,6 +72,10 @@ public void incCurrentHealthyPipelinesCount() {
     this.currentHealthyPipelinesCount.incr();
   }
 
+  public void setNumCurrentHealthyPipelines(long val) {
+    this.currentHealthyPipelinesCount.set(val);
+  }
+
   public void setNumPipelinesWithAtleastOneReplicaReportedThreshold(long val) {
     this.numPipelinesWithAtleastOneReplicaReportedThreshold.set(val);
   }
@@ -117,7 +121,7 @@ MutableGaugeLong getNumHealthyPipelinesThreshold() {
     return numHealthyPipelinesThreshold;
   }
 
-  MutableCounterLong getCurrentHealthyPipelinesCount() {
+  MutableGaugeLong getCurrentHealthyPipelinesCount() {
     return currentHealthyPipelinesCount;
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index ec97e4df6a9..b1312512d56 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.hdds.scm.safemode;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -295,6 +296,118 @@ public void 
testHealthyPipelineSafeModeRuleWithMixedPipelines()
 
   }
 
+  @Test
+  public void 
testHealthyPipelineThresholdIncreasesWithMorePipelinesAndReports()
+      throws Exception {
+    EventQueue eventQueue = new EventQueue();
+    SCMServiceManager serviceManager = new SCMServiceManager();
+    SCMContext scmContext = SCMContext.emptyContext();
+    List<ContainerInfo> containers =
+        new ArrayList<>(HddsTestUtils.getContainerInfo(1));
+
+    OzoneConfiguration config = new OzoneConfiguration();
+    MockNodeManager nodeManager = new MockNodeManager(true, 12);
+    ContainerManager containerManager = mock(ContainerManager.class);
+    when(containerManager.getContainers()).thenReturn(containers);
+    config.set(HddsConfigKeys.OZONE_METADATA_DIRS, tempFile.getPath());
+    config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, 
false);
+    
config.setDouble(HddsConfigKeys.HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
+        0.5);
+    config.setInt(HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_DATANODE, 0);
+
+    SCMMetadataStore scmMetadataStore = new SCMMetadataStoreImpl(config);
+    try {
+      PipelineManagerImpl pipelineManager =
+          PipelineManagerImpl.newPipelineManager(
+              config,
+              SCMHAManagerStub.getInstance(true),
+              nodeManager,
+              scmMetadataStore.getPipelineTable(),
+              eventQueue,
+              scmContext,
+              serviceManager,
+              Clock.system(ZoneOffset.UTC));
+
+      PipelineProvider mockRatisProvider =
+          new MockRatisPipelineProvider(nodeManager,
+              pipelineManager.getStateManager(), config);
+      pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+          mockRatisProvider);
+
+      // Create all pipelines before SCM enters safe mode. Pipeline creation is
+      // blocked once safe mode prechecks have not passed.
+      Pipeline pipeline1 =
+          pipelineManager.createPipeline(RatisReplicationConfig.getInstance(
+              ReplicationFactor.THREE));
+      Pipeline pipeline2 =
+          pipelineManager.createPipeline(RatisReplicationConfig.getInstance(
+              ReplicationFactor.THREE));
+      Pipeline pipeline3 =
+          pipelineManager.createPipeline(RatisReplicationConfig.getInstance(
+              ReplicationFactor.THREE));
+
+      // Start with one healthy open pipeline. Threshold is small at this 
point.
+      pipelineManager.openPipeline(pipeline1.getId());
+      pipeline1 = pipelineManager.getPipeline(pipeline1.getId());
+      MockRatisPipelineProvider.markPipelineHealthy(pipeline1);
+
+      SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(config,
+          nodeManager, pipelineManager, containerManager, serviceManager,
+          eventQueue, scmContext);
+      scmSafeModeManager.start();
+
+      HealthyPipelineSafeModeRule healthyPipelineSafeModeRule =
+          SafeModeRuleFactory.getInstance()
+              .getSafeModeRule(HealthyPipelineSafeModeRule.class);
+      healthyPipelineSafeModeRule.setValidateBasedOnReportProcessing(false);
+
+      firePipelineEvent(pipeline1, eventQueue);
+      assertTrue(healthyPipelineSafeModeRule.validate());
+      assertEquals(1, 
healthyPipelineSafeModeRule.getHealthyPipelineThresholdCount());
+
+      // Open more pipelines so threshold increases.
+      pipelineManager.openPipeline(pipeline2.getId());
+      pipeline2 = pipelineManager.getPipeline(pipeline2.getId());
+      MockRatisPipelineProvider.markPipelineHealthy(pipeline2);
+
+      pipelineManager.openPipeline(pipeline3.getId());
+      pipeline3 = pipelineManager.getPipeline(pipeline3.getId());
+      MockRatisPipelineProvider.markPipelineHealthy(pipeline3);
+
+      // Simulate DN reports causing pipelines to be considered unhealthy.
+      for (DatanodeDetails dn : pipeline2.getNodes()) {
+        nodeManager.setNodeState(dn, HddsProtos.NodeState.DEAD);
+      }
+      for (DatanodeDetails dn : pipeline3.getNodes()) {
+        nodeManager.setNodeState(dn, HddsProtos.NodeState.DEAD);
+      }
+      firePipelineEvent(pipeline2, eventQueue);
+      firePipelineEvent(pipeline3, eventQueue);
+
+      assertFalse(healthyPipelineSafeModeRule.validate());
+      assertEquals(2, 
healthyPipelineSafeModeRule.getHealthyPipelineThresholdCount());
+
+      // Simulate more DN reports and recovery to healthy state, then exit 
rule.
+      for (DatanodeDetails dn : pipeline1.getNodes()) {
+        nodeManager.setNodeState(dn, HddsProtos.NodeState.HEALTHY);
+      }
+      for (DatanodeDetails dn : pipeline2.getNodes()) {
+        nodeManager.setNodeState(dn, HddsProtos.NodeState.HEALTHY);
+      }
+      for (DatanodeDetails dn : pipeline3.getNodes()) {
+        nodeManager.setNodeState(dn, HddsProtos.NodeState.HEALTHY);
+      }
+      firePipelineEvent(pipeline1, eventQueue);
+      firePipelineEvent(pipeline2, eventQueue);
+      firePipelineEvent(pipeline3, eventQueue);
+
+      assertTrue(healthyPipelineSafeModeRule.validate());
+      assertEquals(2, 
healthyPipelineSafeModeRule.getHealthyPipelineThresholdCount());
+    } finally {
+      scmMetadataStore.getStore().close();
+    }
+  }
+
   @Test
   public void testPipelineIgnoredWhenDnIsUnhealthy() throws Exception {
     EventQueue eventQueue = new EventQueue();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to