Copilot commented on code in PR #10412:
URL: https://github.com/apache/ozone/pull/10412#discussion_r3341275430


##########
HDDS-8342-Evaluation-And-Design.md:
##########
@@ -0,0 +1,90 @@
+# Evaluation of Branch HDDS-8342 & Design for Resumable Lifecycle Scans
+
+## 1. Evaluation of Current Implementation (HDDS-8342)
+
+The `HDDS-8342` branch introduces the `KeyLifecycleService`, a background 
service running on the Ozone Manager (OM) Leader to enforce bucket lifecycle 
rules (expiration, moving to trash, and aborting incomplete multipart uploads).
+
+### Strengths:
+- **Comprehensive Coverage:** Handles OBS/Legacy buckets via sequential key 
iteration and FSO buckets via a Depth-First Search (DFS) to safely delete empty 
directories.
+- **Batching:** Efficiently batches delete requests and submits them via 
Ratis, respecting the `ratisByteLimit`.
+- **Fault Injection & Metrics:** Good use of metrics and fault injectors for 
testing.
+
+### Identified Issues & Limitations:
+1. **Lack of Resumability:** The entire bucket is scanned in a single `call()` 
execution. If the OM restarts, crashes, or a leader transfer occurs, the scan 
state is lost. The new leader must restart the scan from the beginning. For 
buckets with billions of keys, the scan may never complete if leader transfers 
happen periodically.
+2. **Thread Starvation (No Yielding):** The `call()` method loops over the 
entire bucket. This blocks the background service thread for a very long time, 
preventing other buckets or background tasks from getting fair CPU time.
+3. **Redundant FSO Scanning:** FSO buckets are evaluated rule by rule (for 
rules with prefixes). If multiple rules have different prefixes, the DFS 
traverses overlapping paths, which is less efficient than a single unified pass.
+
+---
+
+## 2. Design: Persisting Bucket Scan Pointers
+
+To solve the resumability and thread starvation issues, we need to persist the 
scan progress (the "pointer") to the OM DB. This ensures that a new OM leader 
can resume from where the previous leader left off, and allows the task to 
yield cooperatively.
+
+### 2.1 Data Structure for the Scan Pointer
+We will define a new Protobuf message `LifecycleScanStateProto` to capture the 
exact position of the scan.
+
+```protobuf
+message LifecycleScanStateProto {
+  optional string bucketKey = 1;       // e.g., /volume/bucket
+  optional uint64 scanStartTime = 2;   // Epoch time when this full scan 
started
+  
+  // Phase indicator
+  enum ScanPhase {
+    OBS_KEY_SCAN = 1;
+    FSO_DFS_SCAN = 2;
+    MPU_SCAN = 3;
+    COMPLETED = 4;
+  }
+  optional ScanPhase phase = 3;
+
+  // For OBS/Legacy (Sequential Scan)
+  optional string lastScannedKey = 4;
+
+  // For FSO (DFS Scan)
+  optional uint64 currentRuleIndex = 5;      // Which rule is currently being 
evaluated
+  optional uint64 lastScannedDirObjId = 6;   // The directory currently being 
processed
+  optional string lastScannedFsoKey = 7;     // The last key evaluated in the 
current directory
+  optional string lastScannedSubDir = 8;     // The last subdirectory 
evaluated in the current directory
+  
+  // For MPU Scan
+  optional string lastScannedMpuKey = 9;
+}
+```
+
+### 2.2 OM Database Schema Updates
+Add a new table to `OMMetadataManager` to store the scan states:
+- **Table Name:** `lifecycleStateTable`
+- **Key:** `bucketKey` (String, e.g., `/volumeName/bucketName`)
+- **Value:** `LifecycleScanState` (Parsed from `LifecycleScanStateProto`)
+
+### 2.3 When to Persist the Pointer (Checkpointing)
+Persisting the pointer for every key would overwhelm Ratis and RocksDB. We 
should checkpoint periodically:
+
+1. **Piggybacking on Deletes:** Add an optional `LifecycleScanStateProto` 
field to `DeleteKeysRequest` and `RenameKeyRequest` (when moving to trash). 
When the OM state machine applies the deletion, it atomically updates the 
`lifecycleStateTable` with the new pointer. This guarantees exactly-once 
semantics for the scan pointer relative to deletions.
+2. **Periodic Standalone Checkpoints:** If no keys are expired (e.g., scanning 
millions of valid keys), we still need to save progress. We introduce a new OM 
request `SaveLifecycleScanStateRequest`. The `LifecycleActionTask` will send 
this request periodically (e.g., every 100,000 keys iterated, or every 1 minute 
of execution time).
+3. **End of Scan:** When the scan for a bucket finishes, a 
`SaveLifecycleScanStateRequest` is sent to mark the phase as `COMPLETED` and 
record the completion time.
+
+### 2.4 How to Resume the Scan
+When `KeyLifecycleService` schedules a `LifecycleActionTask` for a bucket, it 
first reads the `LifecycleScanState` from the `lifecycleStateTable`.
+
+- **OBS/Legacy Resumption:** 
+  The iterator for `keyTable` is initialized to seek to `lastScannedKey` 
instead of the bucket prefix.
+  ```java
+  TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> keyTblItr 
= keyTable.iterator(bucketPrefix);
+  if (state.getLastScannedKey() != null) {
+      keyTblItr.seek(state.getLastScannedKey());
+      // skip the exact match since it was already processed
+  }
+  ```
+
+- **FSO Resumption:**
+  Rebuilding the DFS stack is required. Using the `lastScannedDirObjId`, the 
task queries the `directoryTable` to find the directory, then traverses its 
parent pointers up to the bucket root to reconstruct the `LimitedSizeStack`. 
+  Once the stack is rebuilt, the task resumes iterating keys in 
`lastScannedDirObjId` starting from `lastScannedFsoKey`, and subdirectories 
starting from `lastScannedSubDir`.
+
+- **MPU Resumption:**
+  The `multipartInfoTable` iterator seeks to `lastScannedMpuKey` and continues.
+
+### 2.5 Task Yielding (Cooperative Multitasking)
+With resumability in place, we can fix the thread starvation issue.
+- The `LifecycleActionTask` should track its execution time. After running for 
a maximum time slice (e.g., 2 minutes) or processing a maximum number of keys, 
the task saves its state via `SaveLifecycleScanStateRequest` and returns 
`EmptyTaskResult`.
+- The `KeyLifecycleService` will re-schedule the bucket in the next queue 
polling cycle, allowing other buckets or background tasks to get fair CPU time.

Review Comment:
   This design document appears to have been accidentally committed at the 
repository root. Design/evaluation docs should not be part of the source tree 
(especially under the repo root). Please remove this file from the PR.



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java:
##########
@@ -117,6 +124,7 @@
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.OzoneTestBase;
 import org.apache.ratis.util.ExitUtils;
+import org.eclipse.jetty.io.ByteBufferPool;

Review Comment:
   Unused (and incorrect) import of `org.eclipse.jetty.io.ByteBufferPool`. This 
class is not referenced anywhere in this test file and should be removed.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -319,76 +324,107 @@ public BackgroundTaskResult call() {
           return result;
         }
 
-        List<OmLCRule> originRuleList = policy.getRules();
-        // remove disabled rules
-        List<OmLCRule> ruleList = originRuleList.stream().filter(r -> 
r.isEnabled()).collect(Collectors.toList());
-
-        List<OmLCRule> expirationRules = ruleList.stream()
-            .filter(r -> r.getExpiration() != null)
-            .collect(Collectors.toList());
-        List<OmLCRule> mpuRules = ruleList.stream()
-            .filter(r -> r.getAbortIncompleteMultipartUpload() != null)
-            .collect(Collectors.toList());
-
-        if (!expirationRules.isEmpty()) {
-          LimitedExpiredObjectList expiredKeyList = new 
LimitedExpiredObjectList(listMaxSize);
-          LimitedExpiredObjectList expiredDirList = new 
LimitedExpiredObjectList(listMaxSize);
-          Table<String, OmKeyInfo> keyTable = 
omMetadataManager.getKeyTable(bucket.getBucketLayout());
-          /**
-           * Filter treatment.
-           * ""  - all objects
-           * "/" - if it's OBS/Legacy, means keys starting with "/"; If it's 
FSO, not supported
-           * "/key" - if it's OBS/Legacy, means keys starting with "/key", "/" 
is literally "/";
-           *          If it's FSO, means keys or dirs starting with "key", "/" 
will be treated as separator mark.
-           * "key" - if it's OBS/Legacy, means keys starting with "key";
-           *         if it's FSO, means keys for dirs starting with "key" too.
-           * "dir/" - if it's OBS/Legacy, means keys starting with "dir/";
-           *        - if it's FSO, means keys/dirs under directory "dir", 
doesn't include directory "dir" itself.
-           *        - For FSO bucket, as directory ModificationTime will not 
be updated when any of its child key/subdir
-           *          changes, so remember to add the tailing slash "/" when 
configure prefix, otherwise the whole
-           *          directory will be expired and deleted once its 
ModificationTime meats the condition.
-           */
-          if (bucket.getBucketLayout() == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
-            OmVolumeArgs volume;
-            try {
-              volume = 
omMetadataManager.getVolumeTable().get(omMetadataManager.getVolumeKey(bucket.getVolumeName()));
-              if (volume == null) {
-                LOG.warn("Volume {} cannot be found, might be deleted during 
this task's execution",
-                    bucket.getVolumeName());
+        OmLifecycleScanState.Builder scanStateBuilder = null;
+        try {
+          OmLifecycleScanState scanState = 
omMetadataManager.getLifecycleScanStateTable().get(bucketKey);
+          if (scanState == null || (scanState != null && 
(scanState.getBucketObjID() != bucket.getObjectID() ||
+              scanState.getLifecycleConfigurationUpdateID() != 
policy.getUpdateID() ||
+              scanState.getScanEndTime() != null))) {

Review Comment:
   The condition `scanState == null || (scanState != null && (...))` has a 
redundant `scanState != null` check inside the OR — once the first disjunct is 
false, `scanState` is guaranteed non-null. Simplify to `scanState == null || 
scanState.getBucketObjID() != bucket.getObjectID() || 
scanState.getLifecycleConfigurationUpdateID() != policy.getUpdateID() || 
scanState.getScanEndTime() != null`.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -479,30 +520,82 @@ private void evaluateFSOBucket(OmVolumeArgs volume, 
OmBucketInfo bucket, String
             }
           }
         } else {
-          evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "", 
null,
-              Arrays.asList(rule), expiredKeyList, expiredDirList);
+          evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "", 
null, "",
+              Arrays.asList(rule), expiredKeyList, expiredDirList, 
scanStateBuilder);
         }
       }
 
       if (!noPrefixRuleList.isEmpty()) {
-        evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "", 
null,
-            noPrefixRuleList, expiredKeyList, expiredDirList);
+        evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "", 
null, "",
+            noPrefixRuleList, expiredKeyList, expiredDirList, 
scanStateBuilder);
+      }
+    }
+
+    private boolean canSkipDir(OmDirectoryInfo currentDir, String 
currentDirTableKey, DirectoryList dirList) {
+      // currentDir null is bucket root
+      if (currentDir == null) {
+        return false;
+      }
+
+      int count = dirList.getSubDirCount();
+      // if currentDir is equal to lastScannedDir
+      if (currentDir.getObjectID() == dirList.getSubDirList().get(count - 
1).getObjectID()) {
+        return false;
+      }
+
+      // if currentDir is parent of lastScannedDir
+      long currentObjID = currentDir.getObjectID();
+      for (int i = 0; i < count; i++) {
+        OmDirectoryInfo dir = dirList.getSubDirList().get(i);
+        if (dir.getObjectID() == currentObjID) {
+          return false;
+        }
       }
+
+      // if currentDir and lastScannedDir has same parent
+      do {
+        long parentID = currentDir.getParentObjectID();
+        for (int i = 0; i < count; i++) {
+          OmDirectoryInfo dir = dirList.getSubDirList().get(i);
+          if (dir.getParentObjectID() == parentID) {
+            if 
(dirList.getSubDirKeyList().get(i).compareTo(currentDirTableKey) < 0) {
+              return true;
+            }
+          }
+        }
+        return false;
+      } while(true);
     }
 
     @SuppressWarnings({"checkstyle:parameternumber", 
"checkstyle:MethodLength"})
     private void evaluateKeyAndDirTable(OmBucketInfo bucket, long volumeObjId, 
Table<String, OmKeyInfo> keyTable,
-        String directoryPath, @Nullable OmDirectoryInfo dir, List<OmLCRule> 
ruleList, LimitedExpiredObjectList keyList,
-        LimitedExpiredObjectList dirList) {
+        String directoryPath, @Nullable OmDirectoryInfo dir, String dirKey, 
List<OmLCRule> ruleList,
+        LimitedExpiredObjectList keyList, LimitedExpiredObjectList dirList,
+        OmLifecycleScanState.Builder scanStateBuilder) {
       String volumeName = bucket.getVolumeName();
       String bucketName = bucket.getBucketName();
       LimitedSizeStack stack = new LimitedSizeStack(cachedDirMaxCount);
+      String lastScannedDirInState = scanStateBuilder == null ? null : 
scanStateBuilder.getLastScannedDir();
+      String lastScannedDirKeyInState = scanStateBuilder == null ? null : 
scanStateBuilder.getLastScannedDirKey();
+      String lastScannedKeyInState = scanStateBuilder == null ? null : 
scanStateBuilder.getLastScannedKey();
+      DirectoryList lastScannedDirList = null;
+      if (lastScannedDirInState != null && lastScannedDirKeyInState != null) {
+        // find all parents of lastScannedSubDir
+        try {
+          lastScannedDirList = getDirList(volumeObjId, bucket, 
lastScannedDirInState, bucketName);
+        } catch (IOException e) {
+          LOG.info("Failed to get DirList for lastScannedDirInState", e);
+          lastScannedDirInState = null;
+        }

Review Comment:
   `LOG.info("Failed to get DirList for lastScannedDirInState", e);` passes an 
exception as the 2nd arg without a corresponding `{}` placeholder; SLF4J 
handles this correctly, but a failure to resume a scan pointer should likely be 
logged at `WARN` (consistent with surrounding `LOG.warn` calls for similar 
failures in this file), not `INFO`. Also include the actual 
`lastScannedDirInState` value in the message.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -721,53 +899,80 @@ private SubDirectorySummary getSubDirectory(long 
dirObjID, String prefix, OMMeta
             continue;
           }
           if (dir.getParentObjectID() == dirObjID) {
-            subDirList.addSubDir(dir);
+            subDirList.addSubDir(entry.getKey(), dir);
           }
         }
       }
       return subDirList;
     }
 
     private void evaluateBucket(OmBucketInfo bucketInfo,
-        Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList, 
LimitedExpiredObjectList expiredKeyList) {
+        Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList, 
LimitedExpiredObjectList expiredKeyList,
+        OmLifecycleScanState.Builder scanStateBuilder) {
       String volumeName = bucketInfo.getVolumeName();
       String bucketName = bucketInfo.getBucketName();
+      String bucketPrefix = omMetadataManager.getBucketKey(volumeName, 
bucketName);
 
-      // use bucket name as key iterator prefix
       try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> 
keyTblItr =
-               keyTable.iterator(omMetadataManager.getBucketKey(volumeName, 
bucketName))) {
+               keyTable.iterator(bucketPrefix)) {
+        if (scanStateBuilder != null && scanStateBuilder.getLastScannedKey() 
!= null) {
+          keyTblItr.seek(scanStateBuilder.getLastScannedKey());
+          // Skip the exact match since it was already processed
+          if (keyTblItr.hasNext()) {
+            Table.KeyValue<String, OmKeyInfo> first = keyTblItr.next();
+            if (!first.getKey().equals(scanStateBuilder.getLastScannedKey())) {
+              // We seeked past it, so we need to process this one.
+              // We can't easily "push back" in TableIterator, so we handle it 
here.
+              processKey(bucketInfo, first.getValue(), ruleList, 
expiredKeyList, scanStateBuilder);
+              numKeyIterated++;
+              lastScannedKey = first.getKey();
+              LOG.info("lastScannedKey3 {}", lastScannedKey);
+            }
+          }
+        }

Review Comment:
   In `evaluateBucket`, after `keyTblItr.seek(lastScannedKey)`, the code calls 
`keyTblItr.next()` and processes the entry if it does not match 
`lastScannedKey` — but it does not verify that the returned key still starts 
with `bucketPrefix`. If `lastScannedKey` is stale (e.g., refers to a key past 
the end of this bucket's range), `seek` may land on a key in a subsequent 
bucket and `processKey` will be invoked on a key from another bucket. Please 
add a `first.getKey().startsWith(bucketPrefix)` guard before processing.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -479,30 +520,82 @@ private void evaluateFSOBucket(OmVolumeArgs volume, 
OmBucketInfo bucket, String
             }
           }
         } else {
-          evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "", 
null,
-              Arrays.asList(rule), expiredKeyList, expiredDirList);
+          evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "", 
null, "",
+              Arrays.asList(rule), expiredKeyList, expiredDirList, 
scanStateBuilder);
         }
       }
 
       if (!noPrefixRuleList.isEmpty()) {
-        evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "", 
null,
-            noPrefixRuleList, expiredKeyList, expiredDirList);
+        evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "", 
null, "",
+            noPrefixRuleList, expiredKeyList, expiredDirList, 
scanStateBuilder);
+      }
+    }
+
+    private boolean canSkipDir(OmDirectoryInfo currentDir, String 
currentDirTableKey, DirectoryList dirList) {
+      // currentDir null is bucket root
+      if (currentDir == null) {
+        return false;
+      }
+
+      int count = dirList.getSubDirCount();
+      // if currentDir is equal to lastScannedDir
+      if (currentDir.getObjectID() == dirList.getSubDirList().get(count - 
1).getObjectID()) {
+        return false;
+      }
+
+      // if currentDir is parent of lastScannedDir
+      long currentObjID = currentDir.getObjectID();
+      for (int i = 0; i < count; i++) {
+        OmDirectoryInfo dir = dirList.getSubDirList().get(i);
+        if (dir.getObjectID() == currentObjID) {
+          return false;
+        }
       }
+
+      // if currentDir and lastScannedDir has same parent
+      do {
+        long parentID = currentDir.getParentObjectID();
+        for (int i = 0; i < count; i++) {
+          OmDirectoryInfo dir = dirList.getSubDirList().get(i);
+          if (dir.getParentObjectID() == parentID) {
+            if 
(dirList.getSubDirKeyList().get(i).compareTo(currentDirTableKey) < 0) {
+              return true;
+            }
+          }
+        }
+        return false;
+      } while(true);

Review Comment:
   The `do { ... return false; } while (true);` loop will always execute 
exactly once and exit via `return false`. The `do/while` is misleading — it 
suggests a multi-iteration loop walking up parent pointers (as the design doc 
describes), but only the immediate parent of `currentDir` is ever compared. 
Either remove the loop construct (since it never iterates) or, if walking up 
ancestors was the intent, advance `currentDir = directoryTable.get(parent)` 
inside the loop. As written, the logic appears incomplete and the loop scaffold 
is dead code.



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java:
##########
@@ -277,6 +286,423 @@ void testAllKeyExpired(BucketLayout bucketLayout, boolean 
createPrefix) throws I
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
+    @ParameterizedTest
+    @MethodSource("parameters1")
+    void testScanStatePiggybackedOnDelete(BucketLayout bucketLayout, boolean 
createPrefix) throws IOException,
+        TimeoutException, InterruptedException {
+      final String volumeName = getTestName();
+      final String bucketName = uniqueObjectName("bucket");
+      String prefix = "key";
+      long initialDeletedKeyCount = getDeletedKeyCount();
+      long initialKeyCount = getKeyCount(bucketLayout);
+      // create keys
+      List<OmKeyArgs> keyList =
+          createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
prefix, null);
+      // check there are keys in keyTable
+      assertEquals(KEY_COUNT, keyList.size());
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
+      // create Lifecycle configuration
+      ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+      ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
+      
+      if (createPrefix) {
+        createLifecyclePolicy(volumeName, bucketName, bucketLayout, prefix, 
null, date.toString(), true);
+      } else {
+        OmLCFilter.Builder filter = getOmLCFilterBuilder(prefix, null, null);
+        createLifecyclePolicy(volumeName, bucketName, bucketLayout, null, 
filter.build(), date.toString(), true);
+      }
+
+      // Wait for deletion
+      GenericTestUtils.waitFor(() ->
+          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
WAIT_CHECK_INTERVAL, 10000);
+      assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
+
+      // Verify that scan state was updated through the DeleteKeysRequest
+      String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+      OmLifecycleScanState scanState = 
metadataManager.getLifecycleScanStateTable().get(bucketKey);
+      assertNotNull(scanState);
+      assertNotNull(scanState.getScanEndTime());
+
+      deleteLifecyclePolicy(volumeName, bucketName);
+    }
+
+    @ParameterizedTest
+    @MethodSource("parameters1")
+    void testBucketScanResume(BucketLayout bucketLayout, boolean createPrefix) 
throws IOException,
+        TimeoutException, InterruptedException {
+      final String volumeName = getTestName();
+      final String bucketName = uniqueObjectName("bucket");
+      String prefix = "key";
+      long initialDeletedKeyCount = getDeletedKeyCount();
+      long initialKeyCount = getKeyCount(bucketLayout);
+      int testKeyCount = 5;
+
+      // Suspend service so it doesn't process immediately after we create the 
policy
+      keyLifecycleService.suspend();
+
+      // create keys
+      List<OmKeyArgs> keyList =
+          createKeys(volumeName, bucketName, bucketLayout, testKeyCount, 1, 
prefix, null);
+      assertEquals(testKeyCount, keyList.size());
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == testKeyCount,
+          WAIT_CHECK_INTERVAL, 1000);
+
+      // determine db keys
+      List<String> dbKeys = new ArrayList<>();
+      long bucketId =
+          
metadataManager.getBucketTable().get(metadataManager.getBucketKey(volumeName, 
bucketName)).getObjectID();
+      long volumeId = 
metadataManager.getVolumeTable().get(metadataManager.getVolumeKey(volumeName)).getObjectID();
+
+      for (OmKeyArgs args : keyList) {
+        if (bucketLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+          dbKeys.add(metadataManager.getOzonePathKey(volumeId, bucketId, 
bucketId, args.getKeyName()));
+        } else {
+          dbKeys.add(metadataManager.getOzoneKey(volumeName, bucketName, 
args.getKeyName()));
+        }
+      }
+      Collections.sort(dbKeys);
+      String lastScannedDbKey = dbKeys.get(2); // The 3rd key
+
+      // create Lifecycle configuration
+      ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+      ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
+      if (createPrefix) {
+        createLifecyclePolicy(volumeName, bucketName, bucketLayout, prefix, 
null, date.toString(), true);
+      } else {
+        OmLCFilter.Builder filter = getOmLCFilterBuilder(prefix, null, null);
+        createLifecyclePolicy(volumeName, bucketName, bucketLayout, null, 
filter.build(), date.toString(), true);
+      }
+
+      // inject the resume state
+      String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+
+      OmLifecycleConfiguration policy = 
metadataManager.getLifecycleConfiguration(volumeName, bucketName);
+      OmLifecycleScanState.Builder stateBuilder = new 
OmLifecycleScanState.Builder()
+          .setBucketKey(bucketKey)
+          .setBucketObjID(bucketId)
+          .setLifecycleConfigurationUpdateID(policy.getUpdateID())
+          .setScanStartTime(System.currentTimeMillis());
+
+      if (bucketLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+        stateBuilder.setLastScannedKey(lastScannedDbKey);
+        stateBuilder.setLastScannedDir("");
+      } else {
+        stateBuilder.setLastScannedKey(lastScannedDbKey);
+      }
+      OmLifecycleScanState scanState = stateBuilder.build();
+      metadataManager.getLifecycleScanStateTable().put(bucketKey, scanState);
+      metadataManager.getLifecycleScanStateTable().addCacheEntry(new 
CacheKey<>(bucketKey),
+          CacheValue.get(1L, scanState));
+      OmLifecycleScanState state = 
metadataManager.getLifecycleScanStateTable().get(bucketKey);
+      assertNotNull(state);
+
+      // resume the service
+      keyLifecycleService.resume();
+
+      // wait for it to process
+      // it should skip the first 3 keys (index 0, 1, 2) since we set 
lastScannedDbKey as index 2.
+      // So it deletes only the last 2 keys (index 3 and 4).
+      int expectedDeleted = 2;
+      GenericTestUtils.waitFor(() ->
+          (getDeletedKeyCount() - initialDeletedKeyCount) >= expectedDeleted, 
WAIT_CHECK_INTERVAL, 10000);
+      
+      // confirm it hasn't deleted all keys
+      assertEquals(testKeyCount - expectedDeleted, getKeyCount(bucketLayout) - 
initialKeyCount);
+      deleteLifecyclePolicy(volumeName, bucketName);
+    }
+
+    @ParameterizedTest
+    @MethodSource("parameters1")
+    void testBucketScanWithScanEndTime(BucketLayout bucketLayout, boolean 
createPrefix) throws IOException,
+        TimeoutException, InterruptedException {
+      final String volumeName = getTestName();
+      final String bucketName = uniqueObjectName("bucket");
+      String prefix = "key";
+      long initialDeletedKeyCount = getDeletedKeyCount();
+      long initialKeyCount = getKeyCount(bucketLayout);
+      int testKeyCount = 5;
+
+      // Suspend service so it doesn't process immediately after we create the 
policy
+      keyLifecycleService.suspend();
+
+      // create keys
+      List<OmKeyArgs> keyList =
+          createKeys(volumeName, bucketName, bucketLayout, testKeyCount, 1, 
prefix, null);
+      assertEquals(testKeyCount, keyList.size());
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == testKeyCount,
+          WAIT_CHECK_INTERVAL, 1000);
+
+      // determine db keys
+      List<String> dbKeys = new ArrayList<>();
+      long bucketId =
+          
metadataManager.getBucketTable().get(metadataManager.getBucketKey(volumeName, 
bucketName)).getObjectID();
+      long volumeId = 
metadataManager.getVolumeTable().get(metadataManager.getVolumeKey(volumeName)).getObjectID();
+
+      for (OmKeyArgs args : keyList) {
+        if (bucketLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+          dbKeys.add(metadataManager.getOzonePathKey(volumeId, bucketId, 
bucketId, args.getKeyName()));
+        } else {
+          dbKeys.add(metadataManager.getOzoneKey(volumeName, bucketName, 
args.getKeyName()));
+        }
+      }
+      Collections.sort(dbKeys);
+      String lastScannedDbKey = dbKeys.get(2); // The 3rd key
+
+      // create Lifecycle configuration
+      ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+      ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
+      if (createPrefix) {
+        createLifecyclePolicy(volumeName, bucketName, bucketLayout, prefix, 
null, date.toString(), true);
+      } else {
+        OmLCFilter.Builder filter = getOmLCFilterBuilder(prefix, null, null);
+        createLifecyclePolicy(volumeName, bucketName, bucketLayout, null, 
filter.build(), date.toString(), true);
+      }
+
+      // inject the resume state but with ScanEndTime set!
+      String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+      OmLifecycleConfiguration policy = 
metadataManager.getLifecycleConfiguration(volumeName, bucketName);
+      OmLifecycleScanState.Builder stateBuilder = new 
OmLifecycleScanState.Builder()
+          .setBucketKey(bucketKey)
+          .setBucketObjID(bucketId)
+          .setLifecycleConfigurationUpdateID(policy.getUpdateID())
+          .setScanStartTime(System.currentTimeMillis())
+          .setScanEndTime(System.currentTimeMillis()); // Scan finished 
previously
+
+      if (bucketLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+        stateBuilder.setLastScannedKey(lastScannedDbKey);
+        stateBuilder.setLastScannedDir("");
+      } else {
+        stateBuilder.setLastScannedKey(lastScannedDbKey);
+      }
+      OmLifecycleScanState scanState = stateBuilder.build();
+      metadataManager.getLifecycleScanStateTable().put(bucketKey, scanState);
+      metadataManager.getLifecycleScanStateTable().addCacheEntry(new 
CacheKey<>(bucketKey),
+          CacheValue.get(1L, scanState));
+
+      // resume the service
+      keyLifecycleService.resume();
+
+      // wait for it to process
+      // Since ScanEndTime is set, it will ignore the lastScannedKey and start 
from the beginning!
+      // So it should delete ALL 5 keys.
+      int expectedDeleted = 5;
+      GenericTestUtils.waitFor(() ->
+          (getDeletedKeyCount() - initialDeletedKeyCount) >= expectedDeleted, 
WAIT_CHECK_INTERVAL, 10000);
+      
+      // confirm it has deleted all keys
+      assertEquals(testKeyCount - expectedDeleted, getKeyCount(bucketLayout) - 
initialKeyCount);
+
+      deleteLifecyclePolicy(volumeName, bucketName);
+    }
+
+    @ParameterizedTest
+    @MethodSource("parameters1")
+    void testScanEmptyBucket(BucketLayout bucketLayout, boolean moveToTrash) 
throws Exception {
+      final String volumeName = getTestName();
+      final String bucketName = uniqueObjectName("bucket");
+      
+      keyLifecycleService.setMoveToTrashEnabled(moveToTrash);
+      
+      // Create empty bucket
+      createVolumeAndBucket(volumeName, bucketName, bucketLayout,
+          UserGroupInformation.getCurrentUser().getShortUserName());
+          
+      // create Lifecycle configuration
+      ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+      ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
+      OmLCFilter.Builder filter = getOmLCFilterBuilder("key", null, null);
+      createLifecyclePolicy(volumeName, bucketName, bucketLayout, null, 
filter.build(), date.toString(), true);
+
+      // Wait until scan completes. Since the bucket is empty, it will scan 
immediately.
+      // We check if the scanEndTime is set.
+      String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+      
+      GenericTestUtils.waitFor(() -> {
+        try {
+          OmLifecycleScanState scanState = 
metadataManager.getLifecycleScanStateTable().get(bucketKey);
+          return scanState != null && scanState.getScanEndTime() != null;
+        } catch (IOException e) {
+          return false;
+        }
+      }, WAIT_CHECK_INTERVAL, 10000);
+
+      deleteLifecyclePolicy(volumeName, bucketName);
+    }
+
+    @ParameterizedTest
+    @MethodSource("parameters1")
+    void testScanStateFailureDoesNotImpactScan(BucketLayout bucketLayout, 
boolean createPrefix) throws Exception {
+      final String volumeName = getTestName();
+      final String bucketName = uniqueObjectName("bucket");
+      String prefix = "key";
+      long initialDeletedKeyCount = getDeletedKeyCount();
+      long initialKeyCount = getKeyCount(bucketLayout);
+      // create keys
+      List<OmKeyArgs> keyList =
+          createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
prefix, null);
+      assertEquals(KEY_COUNT, keyList.size());
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
+
+      // Inject failure to LifecycleScanStateTable
+      Field tableField = 
OmMetadataManagerImpl.class.getDeclaredField("lifecycleScanStateTable");
+      tableField.setAccessible(true);
+      Table<String, OmLifecycleScanState> originalTable =
+          (Table<String, OmLifecycleScanState>) 
tableField.get(metadataManager);
+      Table<String, OmLifecycleScanState> spyTable = spy(originalTable);
+      doThrow(new RocksDatabaseException("Injected exception for 
testing")).when(spyTable).get(any());
+      doThrow(new RocksDatabaseException("Injected exception for 
testing")).when(spyTable).put(any(), any());
+      tableField.set(metadataManager, spyTable);
+
+      try {
+        // create Lifecycle configuration
+        ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+        ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
+        if (createPrefix) {
+          createLifecyclePolicy(volumeName, bucketName, bucketLayout, prefix, 
null, date.toString(), true);
+        } else {
+          OmLCFilter.Builder filter = getOmLCFilterBuilder(prefix, null, null);
+          createLifecyclePolicy(volumeName, bucketName, bucketLayout, null, 
filter.build(), date.toString(), true);
+        }
+
+        // Even though reading scan state fails, the deletion should still 
proceed normally.
+        GenericTestUtils.waitFor(() ->
+            (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
WAIT_CHECK_INTERVAL, 10000);
+        assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
+
+        deleteLifecyclePolicy(volumeName, bucketName);
+      } finally {
+        // Restore original table
+        tableField.set(metadataManager, originalTable);
+      }
+    }
+
+    public Stream<Arguments> parameters11() {
+      return Stream.of(
+          arguments(FILE_SYSTEM_OPTIMIZED, 2),
+          arguments(FILE_SYSTEM_OPTIMIZED, 3),
+          arguments(FILE_SYSTEM_OPTIMIZED, 7),
+          arguments(BucketLayout.OBJECT_STORE, 2),
+          arguments(BucketLayout.OBJECT_STORE, 3),
+          arguments(BucketLayout.OBJECT_STORE, 7)
+      );
+    }

Review Comment:
   `parameters11` and `parameters12` are unclear MethodSource names. Consider 
renaming to something descriptive of what they parameterize (e.g., 
`nestedFsoScanParameters`, `directorySkipParameters`) to make tests easier to 
understand and maintain. Similar to the existing `parameters1` (which is 
already a poor name), but new additions should not perpetuate the pattern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to