rakeshadr commented on code in PR #10326:
URL: https://github.com/apache/ozone/pull/10326#discussion_r3370778417


##########
hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java:
##########
@@ -175,12 +179,188 @@ public void testUpdateContainerStateFromOpen() throws 
Exception {
 
     DatanodeDetails datanodeDetails = randomDatanodeDetails();
 
-    // First report with "CLOSED" replica state moves container state to
+    // First report with "CLOSING" replica state moves container state to
     // "CLOSING".
-    getContainerManager().checkAndAddNewContainer(containerID, State.CLOSED,
+    getContainerManager().checkAndAddNewContainer(containerID, State.CLOSING,
         datanodeDetails);
     assertEquals(CLOSING,
         getContainerManager().getContainer(containerID).getState());
+    assertFalse(getContainerManager().getPipelineToOpenContainer()
+        .containsKey(containerWithPipeline.getPipeline().getId()));
+  }
+
+  @Test
+  public void testOpenContainerReconcilesToClosedFromScmOnClosedDnReport()
+      throws Exception {
+    ContainerWithPipeline openContainer =
+        getTestContainer(113L, LifeCycleState.OPEN);
+    ContainerID containerID = openContainer.getContainerInfo().containerID();
+    getContainerManager().addNewContainer(openContainer);
+
+    when(getContainerManager().getScmClient()
+        .getContainerWithPipeline(containerID.getId()))
+        .thenReturn(getTestContainer(113L, CLOSED));
+
+    getContainerManager().checkAndAddNewContainer(containerID, State.CLOSED,
+        randomDatanodeDetails());
+
+    assertEquals(CLOSED,
+        getContainerManager().getContainer(containerID).getState());
+    assertFalse(getContainerManager().getPipelineToOpenContainer()
+        .containsKey(openContainer.getPipeline().getId()));
+  }
+
+  @Test
+  public void testOpenContainerNotUpdatedFromUnhealthyReplicaReports()
+      throws Exception {
+    for (State replicaState : new State[] {
+        State.UNHEALTHY, State.INVALID, State.DELETED}) {
+      ContainerWithPipeline containerWithPipeline =
+          getTestContainer(120L + replicaState.ordinal(), LifeCycleState.OPEN);
+      ContainerID containerID =
+          containerWithPipeline.getContainerInfo().containerID();
+      getContainerManager().addNewContainer(containerWithPipeline);
+
+      getContainerManager().checkAndAddNewContainer(containerID, replicaState,
+          randomDatanodeDetails());
+
+      assertEquals(LifeCycleState.OPEN,
+          getContainerManager().getContainer(containerID).getState());
+      assertTrue(getContainerManager().getPipelineToOpenContainer()
+          .containsKey(containerWithPipeline.getPipeline().getId()));
+    }
+  }
+
+  @Test
+  public void testClosingContainerAdvancesToQuasiClosedFromScm()
+      throws Exception {
+    assertClosingContainerAdvancesToScmState(101L, QUASI_CLOSED,
+        QUASI_CLOSED);
+  }
+
+  @Test
+  public void testClosingContainerAdvancesToClosedFromScm() throws Exception {
+    assertClosingContainerAdvancesToScmState(102L, CLOSED, CLOSED);
+  }
+
+  @Test
+  public void testClosingContainerAdvancesToDeletingFromScm()
+      throws Exception {
+    assertClosingContainerAdvancesToScmState(103L, DELETING, DELETING);
+  }
+
+  @Test
+  public void testClosingContainerAdvancesToDeletedFromScm() throws Exception {
+    assertClosingContainerAdvancesToScmState(104L, DELETED, DELETED);
+  }
+
+  @Test
+  public void testClosingContainerNotUpdatedFromUnhealthyReplicaReport()
+      throws Exception {
+    ContainerWithPipeline closingContainer = getTestContainer(105L, CLOSING);
+    ContainerID containerID = 
closingContainer.getContainerInfo().containerID();
+    getContainerManager().addNewContainer(closingContainer);
+
+    getContainerManager().checkAndAddNewContainer(containerID, State.UNHEALTHY,
+        randomDatanodeDetails());
+
+    assertEquals(CLOSING,
+        getContainerManager().getContainer(containerID).getState());
+  }
+
+  @Test
+  public void testRecoverDeletedContainerToClosedFromDnReport()
+      throws Exception {
+    assertDeletedContainerRecoversFromScm(106L, State.CLOSED, CLOSED);
+  }
+
+  @Test
+  public void testRecoverDeletedContainerToQuasiClosedFromDnReport()
+      throws Exception {
+    assertDeletedContainerRecoversFromScm(107L, State.QUASI_CLOSED,
+        QUASI_CLOSED);
+  }
+
+  @Test
+  public void testDeletedContainerRecoversFromOpenDnReportWhenScmIsLive()
+      throws Exception {
+    assertDeletedContainerRecoversFromScm(108L, State.OPEN, CLOSED);
+  }
+
+  @Test
+  public void testDeletedContainerNotRecoveredWhenScmIsNotLive()

Review Comment:
   can you rename to reflect the test scenario, ScmIsNotLive" is confusing.
   
   `testDeletedContainerNotRecoveredWhenScmIsNotClosedOrQuasiClosed`



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java:
##########
@@ -210,28 +215,161 @@ void transitionOpenToClosing(ContainerID containerID, 
ContainerInfo containerInf
         pipelineToOpenContainer.put(pipelineID, curCnt - 1);
       }
     }
-    updateContainerState(containerID, FINALIZE);  // OPEN → CLOSING
+    updateContainerState(containerID, FINALIZE);
   }
 
   /**
-   * Check if an OPEN container should move to CLOSING based on a healthy
-   * non-OPEN DN replica report.
+   * Check if Recon's container lifecycle state can be corrected based on a DN
+   * replica report and SCM's authoritative state.
+   *
+   * <p>Closed or quasi-closed DN reports wake Recon to query SCM and reconcile
+   * the existing container using SCM's authoritative state. Other healthy
+   * non-OPEN reports can still move an OPEN Recon container to CLOSING. For
+   * DELETED, any non-DELETED DN report can trigger recovery, but Recon still
+   * recovers only when SCM reports the container as live.
+   *
+   * @param containerID containerID to check
+   * @param state replica state reported by a DataNode
    */
   private void checkContainerStateAndUpdate(ContainerID containerID,
-                                            ContainerReplicaProto.State 
replicaState)
+                                            ContainerReplicaProto.State state)
       throws IOException, InvalidStateTransitionException {
     ContainerInfo containerInfo = getContainer(containerID);
     HddsProtos.LifeCycleState reconState = containerInfo.getState();
 
-    if (reconState != HddsProtos.LifeCycleState.OPEN
-        || replicaState == ContainerReplicaProto.State.OPEN
-        || !isHealthy(replicaState)) {
+    if (isClosedReplicaState(state)) {
+      reconcileContainerFromScm(containerID, containerInfo, reconState, state);

Review Comment:
   With Recon already having correct state CLOSED, querying SCM can be skipped.
   
   ```
       if (isClosedReplicaState(state) && (reconState != 
HddsProtos.LifeCycleState.CLOSED)) {
           reconcileContainerFromScm(containerID, containerInfo, reconState, 
state);
         }
         return;
       }
   ```



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java:
##########
@@ -210,28 +215,161 @@ void transitionOpenToClosing(ContainerID containerID, 
ContainerInfo containerInf
         pipelineToOpenContainer.put(pipelineID, curCnt - 1);
       }
     }
-    updateContainerState(containerID, FINALIZE);  // OPEN → CLOSING
+    updateContainerState(containerID, FINALIZE);
   }
 
   /**
-   * Check if an OPEN container should move to CLOSING based on a healthy
-   * non-OPEN DN replica report.
+   * Check if Recon's container lifecycle state can be corrected based on a DN
+   * replica report and SCM's authoritative state.
+   *
+   * <p>Closed or quasi-closed DN reports wake Recon to query SCM and reconcile
+   * the existing container using SCM's authoritative state. Other healthy
+   * non-OPEN reports can still move an OPEN Recon container to CLOSING. For
+   * DELETED, any non-DELETED DN report can trigger recovery, but Recon still
+   * recovers only when SCM reports the container as live.
+   *
+   * @param containerID containerID to check
+   * @param state replica state reported by a DataNode
    */
   private void checkContainerStateAndUpdate(ContainerID containerID,
-                                            ContainerReplicaProto.State 
replicaState)
+                                            ContainerReplicaProto.State state)
       throws IOException, InvalidStateTransitionException {
     ContainerInfo containerInfo = getContainer(containerID);
     HddsProtos.LifeCycleState reconState = containerInfo.getState();
 
-    if (reconState != HddsProtos.LifeCycleState.OPEN
-        || replicaState == ContainerReplicaProto.State.OPEN
-        || !isHealthy(replicaState)) {
+    if (isClosedReplicaState(state)) {
+      reconcileContainerFromScm(containerID, containerInfo, reconState, state);
+      return;
+    }
+
+    if (reconState == HddsProtos.LifeCycleState.DELETED) {
+      recoverDeletedContainerFromScm(containerID, state);
       return;
     }
 
-    LOG.info("Container {} is OPEN in Recon but DN reports replica state {}. "
-        + "Moving to CLOSING.", containerID, replicaState);
-    transitionOpenToClosing(containerID, containerInfo);
+    if (reconState == HddsProtos.LifeCycleState.OPEN
+        && state != ContainerReplicaProto.State.OPEN && isHealthy(state)) {
+      LOG.info("Container {} has state OPEN, but given state is {}.",
+          containerID, state);
+      transitionOpenToClosing(containerID, containerInfo);
+    }
+  }
+
+  private void reconcileContainerFromScm(ContainerID containerID,
+      ContainerInfo containerInfo, HddsProtos.LifeCycleState reconState,
+      ContainerReplicaProto.State replicaState)
+      throws IOException, InvalidStateTransitionException {
+    ContainerWithPipeline scmContainer =
+        scmClient.getContainerWithPipeline(containerID.getId());
+    HddsProtos.LifeCycleState scmState =
+        scmContainer.getContainerInfo().getState();
+
+    if (reconState == HddsProtos.LifeCycleState.DELETED) {
+      recoverDeletedContainerFromScm(containerID, replicaState, scmContainer);
+      return;
+    }
+
+    if (scmState == HddsProtos.LifeCycleState.QUASI_CLOSED) {
+      reconcileToQuasiClosed(containerID, containerInfo, reconState);
+    } else if (scmState == HddsProtos.LifeCycleState.CLOSED) {
+      reconcileToClosed(containerID, containerInfo, reconState);
+    } else if (scmState == HddsProtos.LifeCycleState.DELETING
+        || scmState == HddsProtos.LifeCycleState.DELETED) {
+      reconcileToDeleted(containerID, containerInfo, reconState, scmState);
+    }
+  }
+
+  private void reconcileToQuasiClosed(ContainerID containerID,
+      ContainerInfo containerInfo, HddsProtos.LifeCycleState reconState)
+      throws IOException, InvalidStateTransitionException {
+    if (reconState == HddsProtos.LifeCycleState.OPEN) {
+      transitionOpenToClosing(containerID, containerInfo);
+      reconState = HddsProtos.LifeCycleState.CLOSING;
+    }
+    if (reconState == HddsProtos.LifeCycleState.CLOSING) {
+      updateContainerState(containerID, QUASI_CLOSE);
+      LOG.info("Container {} advanced to QUASI_CLOSED in Recon based on "
+          + "SCM state.", containerID);
+    }
+  }
+
+  private void reconcileToClosed(ContainerID containerID,
+      ContainerInfo containerInfo, HddsProtos.LifeCycleState reconState)
+      throws IOException, InvalidStateTransitionException {
+    if (reconState == HddsProtos.LifeCycleState.OPEN) {
+      transitionOpenToClosing(containerID, containerInfo);
+      reconState = HddsProtos.LifeCycleState.CLOSING;
+    }
+    if (reconState == HddsProtos.LifeCycleState.CLOSING) {
+      updateContainerState(containerID, CLOSE);
+      LOG.info("Container {} advanced to CLOSED in Recon based on SCM state.",
+          containerID);
+    } else if (reconState == HddsProtos.LifeCycleState.QUASI_CLOSED) {
+      updateContainerState(containerID, FORCE_CLOSE);
+      LOG.info("Container {} advanced from QUASI_CLOSED to CLOSED in Recon "
+          + "based on SCM state.", containerID);
+    }
+  }
+
+  private void reconcileToDeleted(ContainerID containerID,
+      ContainerInfo containerInfo, HddsProtos.LifeCycleState reconState,
+      HddsProtos.LifeCycleState scmState)
+      throws IOException, InvalidStateTransitionException {
+    if (reconState == HddsProtos.LifeCycleState.OPEN) {
+      transitionOpenToClosing(containerID, containerInfo);
+      reconState = HddsProtos.LifeCycleState.CLOSING;
+    }
+    if (reconState == HddsProtos.LifeCycleState.CLOSING) {
+      updateContainerState(containerID, CLOSE);
+      reconState = HddsProtos.LifeCycleState.CLOSED;
+    }
+    if (reconState == HddsProtos.LifeCycleState.CLOSED
+        || reconState == HddsProtos.LifeCycleState.QUASI_CLOSED) {
+      updateContainerState(containerID, DELETE);
+      reconState = HddsProtos.LifeCycleState.DELETING;

Review Comment:
   Question: Does Recon need to persist DELETING state?
   
   If the anwser is YES, it means DELETING is now a cross-cycle persistent 
state in Recon - a behavioral change that was not in Recon before this PR, am I 
correct?
   
   Can you please check the impact of this new state addition. 
   
   **Note:** Identified a few cases through claude LLM, please go through and 
do necessary changes.
   
   Case-1) DELETING counted as live container, is that okay?
   ```
   ClusterStateEndpoint.java
   
       // Subtract deleted containers from total containers.
       containerStateCounts.setTotalContainerCount(
           this.containerManager.getContainers().size() -
               containerStateCounts.getDeletedContainersCount());
   
   getContainers().size() returns ALL containers regardless of state. The 
subtraction only removes DELETED count. 
   ```
   
https://github.com/apache/ozone/blob/master/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java#L220
   
   case-2) update comment to reflect the state change.
   #reconcileExistingContainer() method is called for OPEN, QUASI_CLOSED, and 
CLOSED sync passes. If Recon has a container in DELETING, none of these passes 
advance it. Only #syncDeletedContainers() (which scans SCM's DELETED list) can 
complete the DELETING→DELETED transition.
   
   
https://github.com/apache/ozone/blob/master/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java#L246
   
   case-3) update comment to reflect the state change.
   
https://github.com/apache/ozone/blob/master/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java#L373
   
   case-4) update comment to reflect the state change: "CLOSED, QUASI_CLOSED, 
or DELETING."
   
https://github.com/apache/ozone/blob/master/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java#L344



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java:
##########
@@ -210,28 +215,161 @@ void transitionOpenToClosing(ContainerID containerID, 
ContainerInfo containerInf
         pipelineToOpenContainer.put(pipelineID, curCnt - 1);
       }
     }
-    updateContainerState(containerID, FINALIZE);  // OPEN → CLOSING
+    updateContainerState(containerID, FINALIZE);
   }
 
   /**
-   * Check if an OPEN container should move to CLOSING based on a healthy
-   * non-OPEN DN replica report.
+   * Check if Recon's container lifecycle state can be corrected based on a DN
+   * replica report and SCM's authoritative state.
+   *
+   * <p>Closed or quasi-closed DN reports wake Recon to query SCM and reconcile
+   * the existing container using SCM's authoritative state. Other healthy
+   * non-OPEN reports can still move an OPEN Recon container to CLOSING. For
+   * DELETED, any non-DELETED DN report can trigger recovery, but Recon still
+   * recovers only when SCM reports the container as live.
+   *
+   * @param containerID containerID to check
+   * @param state replica state reported by a DataNode
    */
   private void checkContainerStateAndUpdate(ContainerID containerID,
-                                            ContainerReplicaProto.State 
replicaState)
+                                            ContainerReplicaProto.State state)
       throws IOException, InvalidStateTransitionException {
     ContainerInfo containerInfo = getContainer(containerID);
     HddsProtos.LifeCycleState reconState = containerInfo.getState();
 
-    if (reconState != HddsProtos.LifeCycleState.OPEN
-        || replicaState == ContainerReplicaProto.State.OPEN
-        || !isHealthy(replicaState)) {
+    if (isClosedReplicaState(state)) {
+      reconcileContainerFromScm(containerID, containerInfo, reconState, state);
+      return;
+    }
+
+    if (reconState == HddsProtos.LifeCycleState.DELETED) {
+      recoverDeletedContainerFromScm(containerID, state);
       return;
     }
 
-    LOG.info("Container {} is OPEN in Recon but DN reports replica state {}. "
-        + "Moving to CLOSING.", containerID, replicaState);
-    transitionOpenToClosing(containerID, containerInfo);
+    if (reconState == HddsProtos.LifeCycleState.OPEN
+        && state != ContainerReplicaProto.State.OPEN && isHealthy(state)) {
+      LOG.info("Container {} has state OPEN, but given state is {}.",
+          containerID, state);
+      transitionOpenToClosing(containerID, containerInfo);
+    }
+  }
+
+  private void reconcileContainerFromScm(ContainerID containerID,
+      ContainerInfo containerInfo, HddsProtos.LifeCycleState reconState,
+      ContainerReplicaProto.State replicaState)
+      throws IOException, InvalidStateTransitionException {
+    ContainerWithPipeline scmContainer =
+        scmClient.getContainerWithPipeline(containerID.getId());
+    HddsProtos.LifeCycleState scmState =
+        scmContainer.getContainerInfo().getState();
+
+    if (reconState == HddsProtos.LifeCycleState.DELETED) {
+      recoverDeletedContainerFromScm(containerID, replicaState, scmContainer);
+      return;
+    }
+
+    if (scmState == HddsProtos.LifeCycleState.QUASI_CLOSED) {
+      reconcileToQuasiClosed(containerID, containerInfo, reconState);
+    } else if (scmState == HddsProtos.LifeCycleState.CLOSED) {
+      reconcileToClosed(containerID, containerInfo, reconState);
+    } else if (scmState == HddsProtos.LifeCycleState.DELETING
+        || scmState == HddsProtos.LifeCycleState.DELETED) {
+      reconcileToDeleted(containerID, containerInfo, reconState, scmState);
+    }
+  }
+
+  private void reconcileToQuasiClosed(ContainerID containerID,
+      ContainerInfo containerInfo, HddsProtos.LifeCycleState reconState)
+      throws IOException, InvalidStateTransitionException {
+    if (reconState == HddsProtos.LifeCycleState.OPEN) {
+      transitionOpenToClosing(containerID, containerInfo);
+      reconState = HddsProtos.LifeCycleState.CLOSING;
+    }
+    if (reconState == HddsProtos.LifeCycleState.CLOSING) {
+      updateContainerState(containerID, QUASI_CLOSE);
+      LOG.info("Container {} advanced to QUASI_CLOSED in Recon based on "
+          + "SCM state.", containerID);
+    }
+  }
+
+  private void reconcileToClosed(ContainerID containerID,
+      ContainerInfo containerInfo, HddsProtos.LifeCycleState reconState)
+      throws IOException, InvalidStateTransitionException {
+    if (reconState == HddsProtos.LifeCycleState.OPEN) {
+      transitionOpenToClosing(containerID, containerInfo);
+      reconState = HddsProtos.LifeCycleState.CLOSING;
+    }
+    if (reconState == HddsProtos.LifeCycleState.CLOSING) {
+      updateContainerState(containerID, CLOSE);
+      LOG.info("Container {} advanced to CLOSED in Recon based on SCM state.",
+          containerID);
+    } else if (reconState == HddsProtos.LifeCycleState.QUASI_CLOSED) {
+      updateContainerState(containerID, FORCE_CLOSE);
+      LOG.info("Container {} advanced from QUASI_CLOSED to CLOSED in Recon "
+          + "based on SCM state.", containerID);
+    }
+  }
+
+  private void reconcileToDeleted(ContainerID containerID,
+      ContainerInfo containerInfo, HddsProtos.LifeCycleState reconState,
+      HddsProtos.LifeCycleState scmState)
+      throws IOException, InvalidStateTransitionException {
+    if (reconState == HddsProtos.LifeCycleState.OPEN) {
+      transitionOpenToClosing(containerID, containerInfo);
+      reconState = HddsProtos.LifeCycleState.CLOSING;
+    }
+    if (reconState == HddsProtos.LifeCycleState.CLOSING) {
+      updateContainerState(containerID, CLOSE);
+      reconState = HddsProtos.LifeCycleState.CLOSED;
+    }
+    if (reconState == HddsProtos.LifeCycleState.CLOSED
+        || reconState == HddsProtos.LifeCycleState.QUASI_CLOSED) {
+      updateContainerState(containerID, DELETE);
+      reconState = HddsProtos.LifeCycleState.DELETING;
+    }
+    if (scmState == HddsProtos.LifeCycleState.DELETED
+        && reconState == HddsProtos.LifeCycleState.DELETING) {
+      updateContainerState(containerID, CLEANUP);
+      LOG.info("Container {} advanced to {} in Recon based on SCM state.",
+          containerID, scmState);
+    }
+  }
+
+  private void recoverDeletedContainerFromScm(ContainerID containerID,
+      ContainerReplicaProto.State replicaState) throws IOException {
+    if (replicaState == ContainerReplicaProto.State.DELETED) {
+      return;
+    }
+
+    ContainerWithPipeline scmContainer =
+        scmClient.getContainerWithPipeline(containerID.getId());
+    recoverDeletedContainerFromScm(containerID, replicaState, scmContainer);
+  }
+
+  private void recoverDeletedContainerFromScm(ContainerID containerID,
+      ContainerReplicaProto.State replicaState, ContainerWithPipeline 
scmContainer)
+      throws IOException {
+    HddsProtos.LifeCycleState scmState =
+        scmContainer.getContainerInfo().getState();
+    if (scmState != HddsProtos.LifeCycleState.CLOSED
+        && scmState != HddsProtos.LifeCycleState.QUASI_CLOSED) {
+      LOG.info("Container {} is DELETED in Recon and DN reported {}, "
+          + "but SCM state is {}. Skipping recovery.",
+          containerID, replicaState, scmState);
+      return;
+    }
+
+    deleteContainer(containerID);

Review Comment:
   "deleteContainer + addNewContainer" in recovery is not atomic, better to log 
explicitly for better troubleshooting experience.
   
   ```
       deleteContainer(containerID);
       try {
         addNewContainer(scmContainer);
         LOG.info("Recovered container {} from DELETED to {} in Recon "
             + "(dnState={}, scmState={}).",
             containerID, scmState, replicaState, scmState);
       } catch (IOException e) {
         LOG.error("Container {} deleted from Recon but re-add as {} failed, "
             + "container absent until next SCM sync cycle.",
             containerID, scmState, e);
         throw e;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to