rakeshadr commented on code in PR #10074:
URL: https://github.com/apache/ozone/pull/10074#discussion_r3199651576


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java:
##########
@@ -19,86 +19,731 @@
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLEANUP;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLOSE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.DELETE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.FORCE_CLOSE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.QUASI_CLOSE;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD_DEFAULT;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.recon.metrics.ReconScmContainerSyncMetrics;
 import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Helper class that performs targeted incremental sync between SCM and Recon
+ * container metadata. Executes four passes per sync cycle:
+ *
+ * <ol>
+ *   <li><b>Pass 1 — CLOSED (SCM-driven, add + correct):</b> fetches SCM's
+ *       CLOSED container ID list, adds any absent from Recon, and corrects
+ *       containers that are OPEN or CLOSING in Recon but CLOSED in SCM.</li>
+ *   <li><b>Pass 2 — OPEN (SCM-driven, add only):</b> adds OPEN containers
+ *       that are absent from Recon entirely (e.g., created while Recon was
+ *       down).</li>
+ *   <li><b>Pass 3 — QUASI_CLOSED (SCM-driven, add only):</b> adds
+ *       QUASI_CLOSED containers absent from Recon. Requires that SCM returns
+ *       container metadata with a null pipeline when pipeline lookup fails, 
and
+ *       that Recon's {@code addNewContainer} handles a null pipeline 
gracefully;
+ *       otherwise QUASI_CLOSED containers whose pipelines have been cleaned up
+ *       will fail with {@code NullPointerException} or {@code 
IOException}.</li>
+ *   <li><b>Pass 4 — DELETED retirement (Recon-driven, transition only):</b>
+ *       scans Recon's CLOSED and QUASI_CLOSED containers in batches, queries
+ *       SCM for each, and transitions any that SCM reports as DELETED.
+ *       Intentionally Recon-driven (not SCM-driven) because SCM's DELETED
+ *       list grows unboundedly; starting from Recon's bounded set of
+ *       non-terminal containers is always more efficient.</li>
+ * </ol>
+ */
 class ReconStorageContainerSyncHelper {
 
   // Serialized size of one ContainerID proto on the wire (varint tag + 8-byte 
long = ~12 bytes).
   // Used to derive the maximum batch size that fits within 
ipc.maximum.data.length.
   private static final long CONTAINER_ID_PROTO_SIZE_BYTES = 12;
 
+  /**
+   * Rotating cursor for Pass 4 (DELETED retirement). Tracks the list position
+   * where the next sync cycle should begin so that all candidates are
+   * eventually covered regardless of batch size. Volatile because it is
+   * updated by the scheduler thread and read by tests.
+   */
+  private volatile int pass4BatchOffset = 0;
+  /**
+   * Monotonic cursor for Pass 2 (OPEN add-only sync). OPEN containers are
+   * created with increasing container IDs, so each cycle only needs to scan
+   * from the last-seen ID onward rather than rescanning the full OPEN set.
+   */
+  private volatile long pass2OpenStartContainerId = 1L;
+
   private static final Logger LOG = LoggerFactory
       .getLogger(ReconStorageContainerSyncHelper.class);
 
   private final StorageContainerServiceProvider scmServiceProvider;
   private final OzoneConfiguration ozoneConfiguration;
   private final ReconContainerManager containerManager;
+  private final ReconScmContainerSyncMetrics metrics;
+
+  /**
+   * Describes the action that the periodic scheduler should take based on the
+   * observed drift between SCM and Recon container metadata.
+   */
+  public enum SyncAction {
+    /**
+     * No drift detected — no sync work needed this cycle.
+     */
+    NO_ACTION,
+
+    /**
+     * Small or per-state drift detected — run the four-pass targeted sync.
+     * This is the normal steady-state response: cheaper than a full snapshot
+     * and sufficient for the vast majority of drift scenarios.
+     */
+    TARGETED_SYNC,
+
+    /**
+     * Large total-count drift detected — replace Recon's entire SCM DB with a

Review Comment:
   With the new changes, FULL_SNAPSHOT does nothing much. The scheduler just 
logs a warning and breaks, right?
   
   Can we remove FULL_SNAPSHOT entirely to make it more readable and straight 
forward? Anyway we have follow-up jira for the external trigger 
https://issues.apache.org/jira/browse/HDDS-15165.
   
   In that case, `decideSyncAction() `returns only TARGETED_SYNC or NO_ACTION. 
Then record the metric and log the large drift warning inside 
`decideSyncAction()` ?
   
   Also, since there is no case of fullScmDbSnapshot, we need to update the 
metrics as well.
   
   Suggestion: Rename Metric `recordFullSnapshotThresholdExceededEvent --to-> 
recordLargeDriftThresholdExceededEvent`



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java:
##########
@@ -19,86 +19,731 @@
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLEANUP;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLOSE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.DELETE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.FORCE_CLOSE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.QUASI_CLOSE;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD_DEFAULT;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.recon.metrics.ReconScmContainerSyncMetrics;
 import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Helper class that performs targeted incremental sync between SCM and Recon
+ * container metadata. Executes four passes per sync cycle:
+ *
+ * <ol>
+ *   <li><b>Pass 1 — CLOSED (SCM-driven, add + correct):</b> fetches SCM's
+ *       CLOSED container ID list, adds any absent from Recon, and corrects
+ *       containers that are OPEN or CLOSING in Recon but CLOSED in SCM.</li>
+ *   <li><b>Pass 2 — OPEN (SCM-driven, add only):</b> adds OPEN containers
+ *       that are absent from Recon entirely (e.g., created while Recon was
+ *       down).</li>
+ *   <li><b>Pass 3 — QUASI_CLOSED (SCM-driven, add only):</b> adds
+ *       QUASI_CLOSED containers absent from Recon. Requires that SCM returns
+ *       container metadata with a null pipeline when pipeline lookup fails, 
and
+ *       that Recon's {@code addNewContainer} handles a null pipeline 
gracefully;
+ *       otherwise QUASI_CLOSED containers whose pipelines have been cleaned up
+ *       will fail with {@code NullPointerException} or {@code 
IOException}.</li>
+ *   <li><b>Pass 4 — DELETED retirement (Recon-driven, transition only):</b>
+ *       scans Recon's CLOSED and QUASI_CLOSED containers in batches, queries
+ *       SCM for each, and transitions any that SCM reports as DELETED.
+ *       Intentionally Recon-driven (not SCM-driven) because SCM's DELETED
+ *       list grows unboundedly; starting from Recon's bounded set of
+ *       non-terminal containers is always more efficient.</li>
+ * </ol>
+ */
 class ReconStorageContainerSyncHelper {
 
   // Serialized size of one ContainerID proto on the wire (varint tag + 8-byte 
long = ~12 bytes).
   // Used to derive the maximum batch size that fits within 
ipc.maximum.data.length.
   private static final long CONTAINER_ID_PROTO_SIZE_BYTES = 12;
 
+  /**
+   * Rotating cursor for Pass 4 (DELETED retirement). Tracks the list position
+   * where the next sync cycle should begin so that all candidates are
+   * eventually covered regardless of batch size. Volatile because it is
+   * updated by the scheduler thread and read by tests.
+   */
+  private volatile int pass4BatchOffset = 0;
+  /**
+   * Monotonic cursor for Pass 2 (OPEN add-only sync). OPEN containers are
+   * created with increasing container IDs, so each cycle only needs to scan
+   * from the last-seen ID onward rather than rescanning the full OPEN set.
+   */
+  private volatile long pass2OpenStartContainerId = 1L;
+
   private static final Logger LOG = LoggerFactory
       .getLogger(ReconStorageContainerSyncHelper.class);
 
   private final StorageContainerServiceProvider scmServiceProvider;
   private final OzoneConfiguration ozoneConfiguration;
   private final ReconContainerManager containerManager;
+  private final ReconScmContainerSyncMetrics metrics;
+
+  /**
+   * Describes the action that the periodic scheduler should take based on the
+   * observed drift between SCM and Recon container metadata.
+   */
+  public enum SyncAction {
+    /**
+     * No drift detected — no sync work needed this cycle.
+     */
+    NO_ACTION,
+
+    /**
+     * Small or per-state drift detected — run the four-pass targeted sync.
+     * This is the normal steady-state response: cheaper than a full snapshot
+     * and sufficient for the vast majority of drift scenarios.
+     */
+    TARGETED_SYNC,
+
+    /**
+     * Large total-count drift detected — replace Recon's entire SCM DB with a
+     * fresh checkpoint from SCM. Reserved for cases where targeted sync would
+     * be unreliable (e.g., Recon was down for hours and hundreds of containers
+     * changed state).
+     */
+    FULL_SNAPSHOT
+  }
 
   ReconStorageContainerSyncHelper(StorageContainerServiceProvider 
scmServiceProvider,
                                   OzoneConfiguration ozoneConfiguration,
                                   ReconContainerManager containerManager) {
+    this(scmServiceProvider, ozoneConfiguration, containerManager, null);
+  }
+
+  ReconStorageContainerSyncHelper(StorageContainerServiceProvider 
scmServiceProvider,
+                                  OzoneConfiguration ozoneConfiguration,
+                                  ReconContainerManager containerManager,
+                                  ReconScmContainerSyncMetrics metrics) {
     this.scmServiceProvider = scmServiceProvider;
     this.ozoneConfiguration = ozoneConfiguration;
     this.containerManager = containerManager;
+    this.metrics = metrics;
   }
 
+  /**
+   * Decides what sync action the periodic scheduler should take based on the
+   * observed drift between SCM and Recon.
+   *
+   * <p>Decision logic:
+   * <ol>
+   *   <li>If {@code |(SCM_total - SCM_open) - (Recon_total - Recon_open)| >
+   *       ozone.recon.scm.container.threshold} (default 1,000,000): return
+   *       {@link SyncAction#FULL_SNAPSHOT}. Large drift in non-OPEN containers
+   *       means Recon is badly behind on stable SCM state and a full 
checkpoint
+   *       replacement is cheaper and more reliable at that scale.</li>
+   *   <li>If total drift is positive but the non-OPEN drift is at or below the
+   *       threshold: return {@link SyncAction#TARGETED_SYNC}. This keeps large
+   *       OPEN-only gaps on the incremental path because missing OPEN
+   *       containers can be repaired cheaply without replacing the full SCM 
DB.</li>
+   *   <li>If total drift is zero, check per-state drift for each active
+   *       (non-terminal) lifecycle state against
+   *       {@code ozone.recon.scm.per.state.drift.threshold} (default 5):
+   *       <ul>
+   *         <li><b>OPEN</b>: detects containers stuck OPEN in Recon after SCM
+   *             has advanced them to QUASI_CLOSED or CLOSED.</li>
+   *         <li><b>QUASI_CLOSED</b>: detects containers stuck QUASI_CLOSED in
+   *             Recon after SCM has advanced them to CLOSED. This case 
produces
+   *             zero OPEN drift and is invisible to an OPEN-only check.</li>
+   *       </ul>
+   *       If drift in <em>any</em> checked state exceeds the threshold:
+   *       return {@link SyncAction#TARGETED_SYNC}.</li>
+   *   <li>Otherwise: return {@link SyncAction#NO_ACTION}.</li>
+   * </ol>
+   *
+   * <p>Per-state drift deliberately routes to targeted sync, not a full
+   * snapshot — the targeted sync's per-state passes correct each condition
+   * efficiently without replacing the entire database.
+   *
+   * @return the recommended {@link SyncAction}
+   * @throws IOException if SCM RPC calls to retrieve counts fail
+   */
+  public SyncAction decideSyncAction() throws IOException {
+    int largeThreshold = ozoneConfiguration.getInt(
+        OZONE_RECON_SCM_CONTAINER_THRESHOLD,
+        OZONE_RECON_SCM_CONTAINER_THRESHOLD_DEFAULT);
+    int perStateDriftThreshold = ozoneConfiguration.getInt(
+        OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD,
+        OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD_DEFAULT);
+    List<ContainerInfo> reconContainers = containerManager.getContainers();
+    long reconTotal = reconContainers.size();
+    long reconOpen = reconContainers.stream()
+        .filter(c -> c.getState() == HddsProtos.LifeCycleState.OPEN)
+        .count();
+
+    // --- Check 1: large non-OPEN drift escalates to full snapshot ---
+    long scmTotal = scmServiceProvider.getContainerCount();
+    long scmOpen = 
scmServiceProvider.getContainerCount(HddsProtos.LifeCycleState.OPEN);
+    long totalDrift = Math.abs(scmTotal - reconTotal);
+    long scmNonOpen = Math.max(0, scmTotal - scmOpen);
+    long reconNonOpen = Math.max(0, reconTotal - reconOpen);
+    long nonOpenDrift = Math.abs(scmNonOpen - reconNonOpen);
+
+    if (nonOpenDrift > largeThreshold) {
+      LOG.warn("Non-OPEN container drift {} exceeds threshold {} "

Review Comment:
   In case of large drift recon makes **no repair**, it just give a warning 
repeated every interval indefinitely. Please raise a follow-up jira task for 
focussed impl, we can discuss it algo in detail and do careful reviews.
   
   **Brainstorming Qs:**
   _Can TARGETED_SYNC repair large drift?_
   
    If we trigger TARGETED_SYNC, then can add more load on Recon and SCM during 
large drift repair, similar to FULL_SNAPSHOT. Say, if there are 50M drift then 
triggering a TARGETED_SYNC can be 
capped(ozone.recon.scm.targeted.sync.max.containers.per.cycle - default 10_000 
) and done in multiple cycles. So large drift is repaired gradually across 
multiple (interval)hour cycles rather than all at once.
   
   IIUC, below is the existing throttling mechanism: 
   [ReconServerConfigKeys - 
batch.size](https://github.com/apache/ozone/blob/master/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java#L254)
   OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT = 1_000_000
   ```
   Say, 50M:
   totalClosed = 50,000,000
   batchSize   = 1,000,000
   Iteration 1: fetch IDs 1       → 1,000,000   → process 1M containers
   Iteration 2: fetch IDs 1000001 → 2,000,000   → process 1M containers
   ...
   Iteration 50: fetch IDs 49M    → 50,000,000  → process 1M containers
   Total: 50 RPCs to SCM, all in one sync cycle, back-to-back, no pause
   ```
   
   Possibly can intro a separate config key 
ozone.recon.scm.targeted.sync.max.containers.per.cycle. Now TARGETED_SYNC can 
repairs gradually without SCM overload, and no risk of the DB-swap 
complications discussed earlier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to