Copilot commented on code in PR #10412:
URL: https://github.com/apache/ozone/pull/10412#discussion_r3341275430
##########
HDDS-8342-Evaluation-And-Design.md:
##########
@@ -0,0 +1,90 @@
+# Evaluation of Branch HDDS-8342 & Design for Resumable Lifecycle Scans
+
+## 1. Evaluation of Current Implementation (HDDS-8342)
+
+The `HDDS-8342` branch introduces the `KeyLifecycleService`, a background
service running on the Ozone Manager (OM) Leader to enforce bucket lifecycle
rules (expiration, moving to trash, and aborting incomplete multipart uploads).
+
+### Strengths:
+- **Comprehensive Coverage:** Handles OBS/Legacy buckets via sequential key
iteration and FSO buckets via a Depth-First Search (DFS) to safely delete empty
directories.
+- **Batching:** Efficiently batches delete requests and submits them via
Ratis, respecting the `ratisByteLimit`.
+- **Fault Injection & Metrics:** Good use of metrics and fault injectors for
testing.
+
+### Identified Issues & Limitations:
+1. **Lack of Resumability:** The entire bucket is scanned in a single `call()`
execution. If the OM restarts, crashes, or a leader transfer occurs, the scan
state is lost. The new leader must restart the scan from the beginning. For
buckets with billions of keys, the scan may never complete if leader transfers
happen periodically.
+2. **Thread Starvation (No Yielding):** The `call()` method loops over the
entire bucket. This blocks the background service thread for a very long time,
preventing other buckets or background tasks from getting fair CPU time.
+3. **Redundant FSO Scanning:** FSO buckets are evaluated rule by rule (for
rules with prefixes). If multiple rules have different prefixes, the DFS
traverses overlapping paths, which is less efficient than a single unified pass.
+
+---
+
+## 2. Design: Persisting Bucket Scan Pointers
+
+To solve the resumability and thread starvation issues, we need to persist the
scan progress (the "pointer") to the OM DB. This ensures that a new OM leader
can resume from where the previous leader left off, and allows the task to
yield cooperatively.
+
+### 2.1 Data Structure for the Scan Pointer
+We will define a new Protobuf message `LifecycleScanStateProto` to capture the
exact position of the scan.
+
+```protobuf
+message LifecycleScanStateProto {
+ optional string bucketKey = 1; // e.g., /volume/bucket
+ optional uint64 scanStartTime = 2; // Epoch time when this full scan
started
+
+ // Phase indicator
+ enum ScanPhase {
+ OBS_KEY_SCAN = 1;
+ FSO_DFS_SCAN = 2;
+ MPU_SCAN = 3;
+ COMPLETED = 4;
+ }
+ optional ScanPhase phase = 3;
+
+ // For OBS/Legacy (Sequential Scan)
+ optional string lastScannedKey = 4;
+
+ // For FSO (DFS Scan)
+ optional uint64 currentRuleIndex = 5; // Which rule is currently being
evaluated
+ optional uint64 lastScannedDirObjId = 6; // The directory currently being
processed
+ optional string lastScannedFsoKey = 7; // The last key evaluated in the
current directory
+ optional string lastScannedSubDir = 8; // The last subdirectory
evaluated in the current directory
+
+ // For MPU Scan
+ optional string lastScannedMpuKey = 9;
+}
+```
+
+### 2.2 OM Database Schema Updates
+Add a new table to `OMMetadataManager` to store the scan states:
+- **Table Name:** `lifecycleStateTable`
+- **Key:** `bucketKey` (String, e.g., `/volumeName/bucketName`)
+- **Value:** `LifecycleScanState` (Parsed from `LifecycleScanStateProto`)
+
+### 2.3 When to Persist the Pointer (Checkpointing)
+Persisting the pointer for every key would overwhelm Ratis and RocksDB. We
should checkpoint periodically:
+
+1. **Piggybacking on Deletes:** Add an optional `LifecycleScanStateProto`
field to `DeleteKeysRequest` and `RenameKeyRequest` (when moving to trash).
When the OM state machine applies the deletion, it atomically updates the
`lifecycleStateTable` with the new pointer. This guarantees exactly-once
semantics for the scan pointer relative to deletions.
+2. **Periodic Standalone Checkpoints:** If no keys are expired (e.g., scanning
millions of valid keys), we still need to save progress. We introduce a new OM
request `SaveLifecycleScanStateRequest`. The `LifecycleActionTask` will send
this request periodically (e.g., every 100,000 keys iterated, or every 1 minute
of execution time).
+3. **End of Scan:** When the scan for a bucket finishes, a
`SaveLifecycleScanStateRequest` is sent to mark the phase as `COMPLETED` and
record the completion time.
+
+### 2.4 How to Resume the Scan
+When `KeyLifecycleService` schedules a `LifecycleActionTask` for a bucket, it
first reads the `LifecycleScanState` from the `lifecycleStateTable`.
+
+- **OBS/Legacy Resumption:**
+ The iterator for `keyTable` is initialized to seek to `lastScannedKey`
instead of the bucket prefix.
+ ```java
+ TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> keyTblItr
= keyTable.iterator(bucketPrefix);
+ if (state.getLastScannedKey() != null) {
+ keyTblItr.seek(state.getLastScannedKey());
+ // skip the exact match since it was already processed
+ }
+ ```
+
+- **FSO Resumption:**
+ Rebuilding the DFS stack is required. Using the `lastScannedDirObjId`, the
task queries the `directoryTable` to find the directory, then traverses its
parent pointers up to the bucket root to reconstruct the `LimitedSizeStack`.
+ Once the stack is rebuilt, the task resumes iterating keys in
`lastScannedDirObjId` starting from `lastScannedFsoKey`, and subdirectories
starting from `lastScannedSubDir`.
+
+- **MPU Resumption:**
+ The `multipartInfoTable` iterator seeks to `lastScannedMpuKey` and continues.
+
+### 2.5 Task Yielding (Cooperative Multitasking)
+With resumability in place, we can fix the thread starvation issue.
+- The `LifecycleActionTask` should track its execution time. After running for
a maximum time slice (e.g., 2 minutes) or processing a maximum number of keys,
the task saves its state via `SaveLifecycleScanStateRequest` and returns
`EmptyTaskResult`.
+- The `KeyLifecycleService` will re-schedule the bucket in the next queue
polling cycle, allowing other buckets or background tasks to get fair CPU time.
Review Comment:
This design document appears to have been accidentally committed at the
repository root. Design/evaluation docs should not be part of the source tree
(especially under the repo root). Please remove this file from the PR.
##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java:
##########
@@ -117,6 +124,7 @@
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.OzoneTestBase;
import org.apache.ratis.util.ExitUtils;
+import org.eclipse.jetty.io.ByteBufferPool;
Review Comment:
Unused (and incorrect) import of `org.eclipse.jetty.io.ByteBufferPool`. This
class is not referenced anywhere in this test file and should be removed.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -319,76 +324,107 @@ public BackgroundTaskResult call() {
return result;
}
- List<OmLCRule> originRuleList = policy.getRules();
- // remove disabled rules
- List<OmLCRule> ruleList = originRuleList.stream().filter(r ->
r.isEnabled()).collect(Collectors.toList());
-
- List<OmLCRule> expirationRules = ruleList.stream()
- .filter(r -> r.getExpiration() != null)
- .collect(Collectors.toList());
- List<OmLCRule> mpuRules = ruleList.stream()
- .filter(r -> r.getAbortIncompleteMultipartUpload() != null)
- .collect(Collectors.toList());
-
- if (!expirationRules.isEmpty()) {
- LimitedExpiredObjectList expiredKeyList = new
LimitedExpiredObjectList(listMaxSize);
- LimitedExpiredObjectList expiredDirList = new
LimitedExpiredObjectList(listMaxSize);
- Table<String, OmKeyInfo> keyTable =
omMetadataManager.getKeyTable(bucket.getBucketLayout());
- /**
- * Filter treatment.
- * "" - all objects
- * "/" - if it's OBS/Legacy, means keys starting with "/"; If it's
FSO, not supported
- * "/key" - if it's OBS/Legacy, means keys starting with "/key", "/"
is literally "/";
- * If it's FSO, means keys or dirs starting with "key", "/"
will be treated as separator mark.
- * "key" - if it's OBS/Legacy, means keys starting with "key";
- * if it's FSO, means keys for dirs starting with "key" too.
- * "dir/" - if it's OBS/Legacy, means keys starting with "dir/";
- * - if it's FSO, means keys/dirs under directory "dir",
doesn't include directory "dir" itself.
- * - For FSO bucket, as directory ModificationTime will not
be updated when any of its child key/subdir
- * changes, so remember to add the tailing slash "/" when
configure prefix, otherwise the whole
- * directory will be expired and deleted once its
ModificationTime meats the condition.
- */
- if (bucket.getBucketLayout() == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
- OmVolumeArgs volume;
- try {
- volume =
omMetadataManager.getVolumeTable().get(omMetadataManager.getVolumeKey(bucket.getVolumeName()));
- if (volume == null) {
- LOG.warn("Volume {} cannot be found, might be deleted during
this task's execution",
- bucket.getVolumeName());
+ OmLifecycleScanState.Builder scanStateBuilder = null;
+ try {
+ OmLifecycleScanState scanState =
omMetadataManager.getLifecycleScanStateTable().get(bucketKey);
+ if (scanState == null || (scanState != null &&
(scanState.getBucketObjID() != bucket.getObjectID() ||
+ scanState.getLifecycleConfigurationUpdateID() !=
policy.getUpdateID() ||
+ scanState.getScanEndTime() != null))) {
Review Comment:
The condition `scanState == null || (scanState != null && (...))` has a
redundant `scanState != null` check inside the OR — once the first disjunct is
false, `scanState` is guaranteed non-null. Simplify to `scanState == null ||
scanState.getBucketObjID() != bucket.getObjectID() ||
scanState.getLifecycleConfigurationUpdateID() != policy.getUpdateID() ||
scanState.getScanEndTime() != null`.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -479,30 +520,82 @@ private void evaluateFSOBucket(OmVolumeArgs volume,
OmBucketInfo bucket, String
}
}
} else {
- evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "",
null,
- Arrays.asList(rule), expiredKeyList, expiredDirList);
+ evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "",
null, "",
+ Arrays.asList(rule), expiredKeyList, expiredDirList,
scanStateBuilder);
}
}
if (!noPrefixRuleList.isEmpty()) {
- evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "",
null,
- noPrefixRuleList, expiredKeyList, expiredDirList);
+ evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "",
null, "",
+ noPrefixRuleList, expiredKeyList, expiredDirList,
scanStateBuilder);
+ }
+ }
+
+ private boolean canSkipDir(OmDirectoryInfo currentDir, String
currentDirTableKey, DirectoryList dirList) {
+ // currentDir null is bucket root
+ if (currentDir == null) {
+ return false;
+ }
+
+ int count = dirList.getSubDirCount();
+ // if currentDir is equal to lastScannedDir
+ if (currentDir.getObjectID() == dirList.getSubDirList().get(count -
1).getObjectID()) {
+ return false;
+ }
+
+ // if currentDir is parent of lastScannedDir
+ long currentObjID = currentDir.getObjectID();
+ for (int i = 0; i < count; i++) {
+ OmDirectoryInfo dir = dirList.getSubDirList().get(i);
+ if (dir.getObjectID() == currentObjID) {
+ return false;
+ }
}
+
+ // if currentDir and lastScannedDir has same parent
+ do {
+ long parentID = currentDir.getParentObjectID();
+ for (int i = 0; i < count; i++) {
+ OmDirectoryInfo dir = dirList.getSubDirList().get(i);
+ if (dir.getParentObjectID() == parentID) {
+ if
(dirList.getSubDirKeyList().get(i).compareTo(currentDirTableKey) < 0) {
+ return true;
+ }
+ }
+ }
+ return false;
+ } while(true);
}
@SuppressWarnings({"checkstyle:parameternumber",
"checkstyle:MethodLength"})
private void evaluateKeyAndDirTable(OmBucketInfo bucket, long volumeObjId,
Table<String, OmKeyInfo> keyTable,
- String directoryPath, @Nullable OmDirectoryInfo dir, List<OmLCRule>
ruleList, LimitedExpiredObjectList keyList,
- LimitedExpiredObjectList dirList) {
+ String directoryPath, @Nullable OmDirectoryInfo dir, String dirKey,
List<OmLCRule> ruleList,
+ LimitedExpiredObjectList keyList, LimitedExpiredObjectList dirList,
+ OmLifecycleScanState.Builder scanStateBuilder) {
String volumeName = bucket.getVolumeName();
String bucketName = bucket.getBucketName();
LimitedSizeStack stack = new LimitedSizeStack(cachedDirMaxCount);
+ String lastScannedDirInState = scanStateBuilder == null ? null :
scanStateBuilder.getLastScannedDir();
+ String lastScannedDirKeyInState = scanStateBuilder == null ? null :
scanStateBuilder.getLastScannedDirKey();
+ String lastScannedKeyInState = scanStateBuilder == null ? null :
scanStateBuilder.getLastScannedKey();
+ DirectoryList lastScannedDirList = null;
+ if (lastScannedDirInState != null && lastScannedDirKeyInState != null) {
+ // find all parents of lastScannedSubDir
+ try {
+ lastScannedDirList = getDirList(volumeObjId, bucket,
lastScannedDirInState, bucketName);
+ } catch (IOException e) {
+ LOG.info("Failed to get DirList for lastScannedDirInState", e);
+ lastScannedDirInState = null;
+ }
Review Comment:
`LOG.info("Failed to get DirList for lastScannedDirInState", e);` passes an
exception as the 2nd arg without a corresponding `{}` placeholder; SLF4J
handles this correctly, but a failure to resume a scan pointer should likely be
logged at `WARN` (consistent with surrounding `LOG.warn` calls for similar
failures in this file), not `INFO`. Also include the actual
`lastScannedDirInState` value in the message.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -721,53 +899,80 @@ private SubDirectorySummary getSubDirectory(long
dirObjID, String prefix, OMMeta
continue;
}
if (dir.getParentObjectID() == dirObjID) {
- subDirList.addSubDir(dir);
+ subDirList.addSubDir(entry.getKey(), dir);
}
}
}
return subDirList;
}
private void evaluateBucket(OmBucketInfo bucketInfo,
- Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList,
LimitedExpiredObjectList expiredKeyList) {
+ Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList,
LimitedExpiredObjectList expiredKeyList,
+ OmLifecycleScanState.Builder scanStateBuilder) {
String volumeName = bucketInfo.getVolumeName();
String bucketName = bucketInfo.getBucketName();
+ String bucketPrefix = omMetadataManager.getBucketKey(volumeName,
bucketName);
- // use bucket name as key iterator prefix
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyTblItr =
- keyTable.iterator(omMetadataManager.getBucketKey(volumeName,
bucketName))) {
+ keyTable.iterator(bucketPrefix)) {
+ if (scanStateBuilder != null && scanStateBuilder.getLastScannedKey()
!= null) {
+ keyTblItr.seek(scanStateBuilder.getLastScannedKey());
+ // Skip the exact match since it was already processed
+ if (keyTblItr.hasNext()) {
+ Table.KeyValue<String, OmKeyInfo> first = keyTblItr.next();
+ if (!first.getKey().equals(scanStateBuilder.getLastScannedKey())) {
+ // We seeked past it, so we need to process this one.
+ // We can't easily "push back" in TableIterator, so we handle it
here.
+ processKey(bucketInfo, first.getValue(), ruleList,
expiredKeyList, scanStateBuilder);
+ numKeyIterated++;
+ lastScannedKey = first.getKey();
+ LOG.info("lastScannedKey3 {}", lastScannedKey);
+ }
+ }
+ }
Review Comment:
In `evaluateBucket`, after `keyTblItr.seek(lastScannedKey)`, the code calls
`keyTblItr.next()` and processes the entry if it does not match
`lastScannedKey` — but it does not verify that the returned key still starts
with `bucketPrefix`. If `lastScannedKey` is stale (e.g., refers to a key past
the end of this bucket's range), `seek` may land on a key in a subsequent
bucket and `processKey` will be invoked on a key from another bucket. Please
add a `first.getKey().startsWith(bucketPrefix)` guard before processing.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -479,30 +520,82 @@ private void evaluateFSOBucket(OmVolumeArgs volume,
OmBucketInfo bucket, String
}
}
} else {
- evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "",
null,
- Arrays.asList(rule), expiredKeyList, expiredDirList);
+ evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "",
null, "",
+ Arrays.asList(rule), expiredKeyList, expiredDirList,
scanStateBuilder);
}
}
if (!noPrefixRuleList.isEmpty()) {
- evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "",
null,
- noPrefixRuleList, expiredKeyList, expiredDirList);
+ evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "",
null, "",
+ noPrefixRuleList, expiredKeyList, expiredDirList,
scanStateBuilder);
+ }
+ }
+
+ private boolean canSkipDir(OmDirectoryInfo currentDir, String
currentDirTableKey, DirectoryList dirList) {
+ // currentDir null is bucket root
+ if (currentDir == null) {
+ return false;
+ }
+
+ int count = dirList.getSubDirCount();
+ // if currentDir is equal to lastScannedDir
+ if (currentDir.getObjectID() == dirList.getSubDirList().get(count -
1).getObjectID()) {
+ return false;
+ }
+
+ // if currentDir is parent of lastScannedDir
+ long currentObjID = currentDir.getObjectID();
+ for (int i = 0; i < count; i++) {
+ OmDirectoryInfo dir = dirList.getSubDirList().get(i);
+ if (dir.getObjectID() == currentObjID) {
+ return false;
+ }
}
+
+ // if currentDir and lastScannedDir has same parent
+ do {
+ long parentID = currentDir.getParentObjectID();
+ for (int i = 0; i < count; i++) {
+ OmDirectoryInfo dir = dirList.getSubDirList().get(i);
+ if (dir.getParentObjectID() == parentID) {
+ if
(dirList.getSubDirKeyList().get(i).compareTo(currentDirTableKey) < 0) {
+ return true;
+ }
+ }
+ }
+ return false;
+ } while(true);
Review Comment:
The `do { ... return false; } while (true);` loop will always execute
exactly once and exit via `return false`. The `do/while` is misleading — it
suggests a multi-iteration loop walking up parent pointers (as the design doc
describes), but only the immediate parent of `currentDir` is ever compared.
Either remove the loop construct (since it never iterates) or, if walking up
ancestors was the intent, advance `currentDir = directoryTable.get(parent)`
inside the loop. As written, the logic appears incomplete and the loop scaffold
is dead code.
##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java:
##########
@@ -277,6 +286,423 @@ void testAllKeyExpired(BucketLayout bucketLayout, boolean
createPrefix) throws I
deleteLifecyclePolicy(volumeName, bucketName);
}
+ @ParameterizedTest
+ @MethodSource("parameters1")
+ void testScanStatePiggybackedOnDelete(BucketLayout bucketLayout, boolean
createPrefix) throws IOException,
+ TimeoutException, InterruptedException {
+ final String volumeName = getTestName();
+ final String bucketName = uniqueObjectName("bucket");
+ String prefix = "key";
+ long initialDeletedKeyCount = getDeletedKeyCount();
+ long initialKeyCount = getKeyCount(bucketLayout);
+ // create keys
+ List<OmKeyArgs> keyList =
+ createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1,
prefix, null);
+ // check there are keys in keyTable
+ assertEquals(KEY_COUNT, keyList.size());
+ GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) -
initialKeyCount == KEY_COUNT,
+ WAIT_CHECK_INTERVAL, 1000);
+ // create Lifecycle configuration
+ ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+ ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
+
+ if (createPrefix) {
+ createLifecyclePolicy(volumeName, bucketName, bucketLayout, prefix,
null, date.toString(), true);
+ } else {
+ OmLCFilter.Builder filter = getOmLCFilterBuilder(prefix, null, null);
+ createLifecyclePolicy(volumeName, bucketName, bucketLayout, null,
filter.build(), date.toString(), true);
+ }
+
+ // Wait for deletion
+ GenericTestUtils.waitFor(() ->
+ (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT,
WAIT_CHECK_INTERVAL, 10000);
+ assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
+
+ // Verify that scan state was updated through the DeleteKeysRequest
+ String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+ OmLifecycleScanState scanState =
metadataManager.getLifecycleScanStateTable().get(bucketKey);
+ assertNotNull(scanState);
+ assertNotNull(scanState.getScanEndTime());
+
+ deleteLifecyclePolicy(volumeName, bucketName);
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters1")
+ void testBucketScanResume(BucketLayout bucketLayout, boolean createPrefix)
throws IOException,
+ TimeoutException, InterruptedException {
+ final String volumeName = getTestName();
+ final String bucketName = uniqueObjectName("bucket");
+ String prefix = "key";
+ long initialDeletedKeyCount = getDeletedKeyCount();
+ long initialKeyCount = getKeyCount(bucketLayout);
+ int testKeyCount = 5;
+
+ // Suspend service so it doesn't process immediately after we create the
policy
+ keyLifecycleService.suspend();
+
+ // create keys
+ List<OmKeyArgs> keyList =
+ createKeys(volumeName, bucketName, bucketLayout, testKeyCount, 1,
prefix, null);
+ assertEquals(testKeyCount, keyList.size());
+ GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) -
initialKeyCount == testKeyCount,
+ WAIT_CHECK_INTERVAL, 1000);
+
+ // determine db keys
+ List<String> dbKeys = new ArrayList<>();
+ long bucketId =
+
metadataManager.getBucketTable().get(metadataManager.getBucketKey(volumeName,
bucketName)).getObjectID();
+ long volumeId =
metadataManager.getVolumeTable().get(metadataManager.getVolumeKey(volumeName)).getObjectID();
+
+ for (OmKeyArgs args : keyList) {
+ if (bucketLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+ dbKeys.add(metadataManager.getOzonePathKey(volumeId, bucketId,
bucketId, args.getKeyName()));
+ } else {
+ dbKeys.add(metadataManager.getOzoneKey(volumeName, bucketName,
args.getKeyName()));
+ }
+ }
+ Collections.sort(dbKeys);
+ String lastScannedDbKey = dbKeys.get(2); // The 3rd key
+
+ // create Lifecycle configuration
+ ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+ ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
+ if (createPrefix) {
+ createLifecyclePolicy(volumeName, bucketName, bucketLayout, prefix,
null, date.toString(), true);
+ } else {
+ OmLCFilter.Builder filter = getOmLCFilterBuilder(prefix, null, null);
+ createLifecyclePolicy(volumeName, bucketName, bucketLayout, null,
filter.build(), date.toString(), true);
+ }
+
+ // inject the resume state
+ String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+
+ OmLifecycleConfiguration policy =
metadataManager.getLifecycleConfiguration(volumeName, bucketName);
+ OmLifecycleScanState.Builder stateBuilder = new
OmLifecycleScanState.Builder()
+ .setBucketKey(bucketKey)
+ .setBucketObjID(bucketId)
+ .setLifecycleConfigurationUpdateID(policy.getUpdateID())
+ .setScanStartTime(System.currentTimeMillis());
+
+ if (bucketLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+ stateBuilder.setLastScannedKey(lastScannedDbKey);
+ stateBuilder.setLastScannedDir("");
+ } else {
+ stateBuilder.setLastScannedKey(lastScannedDbKey);
+ }
+ OmLifecycleScanState scanState = stateBuilder.build();
+ metadataManager.getLifecycleScanStateTable().put(bucketKey, scanState);
+ metadataManager.getLifecycleScanStateTable().addCacheEntry(new
CacheKey<>(bucketKey),
+ CacheValue.get(1L, scanState));
+ OmLifecycleScanState state =
metadataManager.getLifecycleScanStateTable().get(bucketKey);
+ assertNotNull(state);
+
+ // resume the service
+ keyLifecycleService.resume();
+
+ // wait for it to process
+ // it should skip the first 3 keys (index 0, 1, 2) since we set
lastScannedDbKey as index 2.
+ // So it deletes only the last 2 keys (index 3 and 4).
+ int expectedDeleted = 2;
+ GenericTestUtils.waitFor(() ->
+ (getDeletedKeyCount() - initialDeletedKeyCount) >= expectedDeleted,
WAIT_CHECK_INTERVAL, 10000);
+
+ // confirm it hasn't deleted all keys
+ assertEquals(testKeyCount - expectedDeleted, getKeyCount(bucketLayout) -
initialKeyCount);
+ deleteLifecyclePolicy(volumeName, bucketName);
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters1")
+ void testBucketScanWithScanEndTime(BucketLayout bucketLayout, boolean
createPrefix) throws IOException,
+ TimeoutException, InterruptedException {
+ final String volumeName = getTestName();
+ final String bucketName = uniqueObjectName("bucket");
+ String prefix = "key";
+ long initialDeletedKeyCount = getDeletedKeyCount();
+ long initialKeyCount = getKeyCount(bucketLayout);
+ int testKeyCount = 5;
+
+ // Suspend service so it doesn't process immediately after we create the
policy
+ keyLifecycleService.suspend();
+
+ // create keys
+ List<OmKeyArgs> keyList =
+ createKeys(volumeName, bucketName, bucketLayout, testKeyCount, 1,
prefix, null);
+ assertEquals(testKeyCount, keyList.size());
+ GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) -
initialKeyCount == testKeyCount,
+ WAIT_CHECK_INTERVAL, 1000);
+
+ // determine db keys
+ List<String> dbKeys = new ArrayList<>();
+ long bucketId =
+
metadataManager.getBucketTable().get(metadataManager.getBucketKey(volumeName,
bucketName)).getObjectID();
+ long volumeId =
metadataManager.getVolumeTable().get(metadataManager.getVolumeKey(volumeName)).getObjectID();
+
+ for (OmKeyArgs args : keyList) {
+ if (bucketLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+ dbKeys.add(metadataManager.getOzonePathKey(volumeId, bucketId,
bucketId, args.getKeyName()));
+ } else {
+ dbKeys.add(metadataManager.getOzoneKey(volumeName, bucketName,
args.getKeyName()));
+ }
+ }
+ Collections.sort(dbKeys);
+ String lastScannedDbKey = dbKeys.get(2); // The 3rd key
+
+ // create Lifecycle configuration
+ ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+ ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
+ if (createPrefix) {
+ createLifecyclePolicy(volumeName, bucketName, bucketLayout, prefix,
null, date.toString(), true);
+ } else {
+ OmLCFilter.Builder filter = getOmLCFilterBuilder(prefix, null, null);
+ createLifecyclePolicy(volumeName, bucketName, bucketLayout, null,
filter.build(), date.toString(), true);
+ }
+
+ // inject the resume state but with ScanEndTime set!
+ String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+ OmLifecycleConfiguration policy =
metadataManager.getLifecycleConfiguration(volumeName, bucketName);
+ OmLifecycleScanState.Builder stateBuilder = new
OmLifecycleScanState.Builder()
+ .setBucketKey(bucketKey)
+ .setBucketObjID(bucketId)
+ .setLifecycleConfigurationUpdateID(policy.getUpdateID())
+ .setScanStartTime(System.currentTimeMillis())
+ .setScanEndTime(System.currentTimeMillis()); // Scan finished
previously
+
+ if (bucketLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+ stateBuilder.setLastScannedKey(lastScannedDbKey);
+ stateBuilder.setLastScannedDir("");
+ } else {
+ stateBuilder.setLastScannedKey(lastScannedDbKey);
+ }
+ OmLifecycleScanState scanState = stateBuilder.build();
+ metadataManager.getLifecycleScanStateTable().put(bucketKey, scanState);
+ metadataManager.getLifecycleScanStateTable().addCacheEntry(new
CacheKey<>(bucketKey),
+ CacheValue.get(1L, scanState));
+
+ // resume the service
+ keyLifecycleService.resume();
+
+ // wait for it to process
+ // Since ScanEndTime is set, it will ignore the lastScannedKey and start
from the beginning!
+ // So it should delete ALL 5 keys.
+ int expectedDeleted = 5;
+ GenericTestUtils.waitFor(() ->
+ (getDeletedKeyCount() - initialDeletedKeyCount) >= expectedDeleted,
WAIT_CHECK_INTERVAL, 10000);
+
+ // confirm it has deleted all keys
+ assertEquals(testKeyCount - expectedDeleted, getKeyCount(bucketLayout) -
initialKeyCount);
+
+ deleteLifecyclePolicy(volumeName, bucketName);
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters1")
+ void testScanEmptyBucket(BucketLayout bucketLayout, boolean moveToTrash)
throws Exception {
+ final String volumeName = getTestName();
+ final String bucketName = uniqueObjectName("bucket");
+
+ keyLifecycleService.setMoveToTrashEnabled(moveToTrash);
+
+ // Create empty bucket
+ createVolumeAndBucket(volumeName, bucketName, bucketLayout,
+ UserGroupInformation.getCurrentUser().getShortUserName());
+
+ // create Lifecycle configuration
+ ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+ ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
+ OmLCFilter.Builder filter = getOmLCFilterBuilder("key", null, null);
+ createLifecyclePolicy(volumeName, bucketName, bucketLayout, null,
filter.build(), date.toString(), true);
+
+ // Wait until scan completes. Since the bucket is empty, it will scan
immediately.
+ // We check if the scanEndTime is set.
+ String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+
+ GenericTestUtils.waitFor(() -> {
+ try {
+ OmLifecycleScanState scanState =
metadataManager.getLifecycleScanStateTable().get(bucketKey);
+ return scanState != null && scanState.getScanEndTime() != null;
+ } catch (IOException e) {
+ return false;
+ }
+ }, WAIT_CHECK_INTERVAL, 10000);
+
+ deleteLifecyclePolicy(volumeName, bucketName);
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters1")
+ void testScanStateFailureDoesNotImpactScan(BucketLayout bucketLayout,
boolean createPrefix) throws Exception {
+ final String volumeName = getTestName();
+ final String bucketName = uniqueObjectName("bucket");
+ String prefix = "key";
+ long initialDeletedKeyCount = getDeletedKeyCount();
+ long initialKeyCount = getKeyCount(bucketLayout);
+ // create keys
+ List<OmKeyArgs> keyList =
+ createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1,
prefix, null);
+ assertEquals(KEY_COUNT, keyList.size());
+ GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) -
initialKeyCount == KEY_COUNT,
+ WAIT_CHECK_INTERVAL, 1000);
+
+ // Inject failure to LifecycleScanStateTable
+ Field tableField =
OmMetadataManagerImpl.class.getDeclaredField("lifecycleScanStateTable");
+ tableField.setAccessible(true);
+ Table<String, OmLifecycleScanState> originalTable =
+ (Table<String, OmLifecycleScanState>)
tableField.get(metadataManager);
+ Table<String, OmLifecycleScanState> spyTable = spy(originalTable);
+ doThrow(new RocksDatabaseException("Injected exception for
testing")).when(spyTable).get(any());
+ doThrow(new RocksDatabaseException("Injected exception for
testing")).when(spyTable).put(any(), any());
+ tableField.set(metadataManager, spyTable);
+
+ try {
+ // create Lifecycle configuration
+ ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+ ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
+ if (createPrefix) {
+ createLifecyclePolicy(volumeName, bucketName, bucketLayout, prefix,
null, date.toString(), true);
+ } else {
+ OmLCFilter.Builder filter = getOmLCFilterBuilder(prefix, null, null);
+ createLifecyclePolicy(volumeName, bucketName, bucketLayout, null,
filter.build(), date.toString(), true);
+ }
+
+ // Even though reading scan state fails, the deletion should still
proceed normally.
+ GenericTestUtils.waitFor(() ->
+ (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT,
WAIT_CHECK_INTERVAL, 10000);
+ assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
+
+ deleteLifecyclePolicy(volumeName, bucketName);
+ } finally {
+ // Restore original table
+ tableField.set(metadataManager, originalTable);
+ }
+ }
+
+ public Stream<Arguments> parameters11() {
+ return Stream.of(
+ arguments(FILE_SYSTEM_OPTIMIZED, 2),
+ arguments(FILE_SYSTEM_OPTIMIZED, 3),
+ arguments(FILE_SYSTEM_OPTIMIZED, 7),
+ arguments(BucketLayout.OBJECT_STORE, 2),
+ arguments(BucketLayout.OBJECT_STORE, 3),
+ arguments(BucketLayout.OBJECT_STORE, 7)
+ );
+ }
Review Comment:
`parameters11` and `parameters12` are unclear MethodSource names. Consider
renaming to something descriptive of what they parameterize (e.g.,
`nestedFsoScanParameters`, `directorySkipParameters`) to make tests easier to
understand and maintain. Similar to the existing `parameters1` (which is
already a poor name), but new additions should not perpetuate the pattern.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]