xichen01 commented on code in PR #10412:
URL: https://github.com/apache/ozone/pull/10412#discussion_r3425968112
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -401,108 +475,226 @@ public BackgroundTaskResult call() {
@SuppressWarnings("checkstyle:parameternumber")
private void evaluateFSOBucket(OmVolumeArgs volume, OmBucketInfo bucket,
String bucketKey,
Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList,
- LimitedExpiredObjectList expiredKeyList, LimitedExpiredObjectList
expiredDirList) {
+ LimitedExpiredObjectList expiredKeyList, LimitedExpiredObjectList
expiredDirList,
+ OmLifecycleScanState.Builder scanStateBuilder) {
List<OmLCRule> prefixRuleList =
ruleList.stream().filter(r ->
r.isPrefixEnable()).collect(Collectors.toList());
// r.isPrefixEnable() == false means empty filter
List<OmLCRule> noPrefixRuleList =
ruleList.stream().filter(r ->
!r.isPrefixEnable()).collect(Collectors.toList());
- for (OmLCRule rule : prefixRuleList) {
- // find KeyInfo of each directory for prefix
- List<OmDirectoryInfo> dirList;
+ if (!noPrefixRuleList.isEmpty()) {
+ // evaluate all rules against each key
+ prefixRuleList.addAll(noPrefixRuleList);
+ evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "",
null, "",
+ prefixRuleList, expiredKeyList, expiredDirList, scanStateBuilder);
+ return;
+ }
+
+ List<RuleListWithDirectoryList> unionPrefixRuleList =
+ getRuleUnion(volume.getObjectID(), bucket, prefixRuleList,
bucketKey);
+
+ if (unionPrefixRuleList != null) {
+ if (unionPrefixRuleList.isEmpty()) {
+ // fallback to evaluate the whole bucket
+ evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "",
null, "",
+ prefixRuleList, expiredKeyList, expiredDirList,
scanStateBuilder);
+ } else {
+ for (RuleListWithDirectoryList ruleWithDirList :
unionPrefixRuleList) {
+ List<OmLCRule> rules = ruleWithDirList.getRuleList();
+ DirectoryList dir = ruleWithDirList.getDirList();
+ evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable,
dir.getLastSubDirPath(),
+ dir.getLastSubDir(), dir.getLastSubDirKey(),
+ rules, expiredKeyList, expiredDirList, scanStateBuilder);
+ }
+ }
+ }
+ }
+
+ /**
+ * Finds the directory union list from a list of prefixes and sorts them
+ * according to the FSO depth-first iteration order.
+ */
+ private List<RuleListWithDirectoryList> getRuleUnion(long volumeId,
OmBucketInfo bucket,
+ List<OmLCRule> rules, String bucketKey) {
+
+ if (rules.isEmpty() || rules.stream().anyMatch(
+ r -> r.getEffectivePrefix() == null ||
r.getEffectivePrefix().isEmpty())) {
+ // The union of anything with the root is just the root itself.
+ return new ArrayList();
+ }
+
+ List<RuleListWithDirectoryList> effectiveRuleList = new ArrayList<>();
+ for (OmLCRule rule : rules) {
+ String prefix = rule.getEffectivePrefix();
+ // Resolve each prefix to actual FSO directories in the DB
try {
- dirList = getDirList(volume, bucket, rule.getEffectivePrefix(),
bucketKey);
+ if (!prefix.endsWith(OzoneConsts.OM_KEY_PREFIX)) {
+ // FSO bucket doesn't allow prefix without tailing '/'
+ // Prefix ends with a slash, it explicitly refers to a directory
(e.g. "log/")
+ LOG.warn("Skip rule {} since FILE_SYSTEM_OPTIMIZED bucket prefix
must end with '/'", rule);
+ continue;
+ }
+
+ // Normalize by removing the trailing slash for uniform comparison
+ String normalizedPrefix = prefix.substring(0, prefix.length() - 1);
+ DirectoryList dirList = getDirList(volumeId, bucket,
normalizedPrefix, bucketKey);
+ // If the prefix is log/, and "log" dir really exists, then the
matched dir is "log".
+ // Otherwise, this rule doesn't match any dir/file in this FSO
bucket, this rule can be skipped.
+ if (!dirList.isEmpty() && dirList.isAllResolvedPrefix()) {
+ RuleListWithDirectoryList ruleListWithDirectoryList = new
RuleListWithDirectoryList(
+ Collections.singletonList(rule), dirList, prefix);
+ effectiveRuleList.add(ruleListWithDirectoryList);
+ }
} catch (IOException e) {
- LOG.warn("Skip rule {} as its prefix doesn't have all directory
exist", rule);
- // skip this rule if some directory doesn't exist for this rule's
prefix
+ // Directory doesn't exist or IO error, skip this rule
+ LOG.warn("Skip to evaluate rule {} due to failed to resolve prefix
{} for bucket {}",
+ rule, prefix, bucketKey, e);
+ }
+ }
+
+ if (effectiveRuleList.isEmpty()) {
+ // there is no valid rule found, either prefix doesn't end with "/",
+ // or any directory along the prefix cannot be found.
+ LOG.warn("Prefix of all rules of bucket {} cannot be resolved to an
existing directory. ", bucketKey);
+ return null;
+ }
+
+ if (effectiveRuleList.size() == 1) {
+ return effectiveRuleList;
+ }
+
+ // Find if one rule's prefix is the sub string of another rule's prefix.
+ // e.g.
+ // dir1/dir2/, dir1/dir2/dir3/, dir1/ -> dir1/
+ // dir1/dir2/, dir1/dir3/, dir1/dir4/ -> dir1/dir2/, dir1/dir3/,
dir1/dir4dir1/
+ // dir1/dir2/dir3/, dir1/dir2/, dir2/ -> dir1/dir2/, dir2/
+ // dir1/dir2/, dir1/dir3/, dir1/ -> dir1/
+ List<RuleListWithDirectoryList> consolidatedRules = new ArrayList<>();
+ Set<OmLCRule> skipEvaluatedRuleList = new HashSet<>();
+ for (int i = 0; i < effectiveRuleList.size(); i++) {
+ OmLCRule rule = effectiveRuleList.get(i).getRuleList().get(0);
+ if (skipEvaluatedRuleList.contains(rule)) {
continue;
}
- StringBuffer lastDirPath = new StringBuffer();
- OmDirectoryInfo lastDir = null;
- if (!dirList.isEmpty()) {
- lastDir = dirList.get(dirList.size() - 1);
- for (int i = 0; i < dirList.size(); i++) {
- lastDirPath.append(dirList.get(i).getName());
- if (i != dirList.size() - 1) {
- lastDirPath.append(OM_KEY_PREFIX);
- }
- }
- if (lastDirPath.toString().startsWith(TRASH_PREFIX)) {
- LOG.info("Skip evaluate trash directory {}", lastDirPath);
- } else {
- evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable,
lastDirPath.toString(), lastDir,
- Arrays.asList(rule), expiredKeyList, expiredDirList);
+
+ RuleListWithDirectoryList consolidatedCandidate = new
RuleListWithDirectoryList();
+ String consolidatedPrefix =
effectiveRuleList.get(i).getConsolidatedPrefix();
+ String finalRuleIndexID = rule.getId();
+ DirectoryList finalDirList = effectiveRuleList.get(i).getDirList();
+ for (int j = i + 1; j < effectiveRuleList.size(); j++) {
+ OmLCRule otherRule = effectiveRuleList.get(j).getRuleList().get(0);
+ if (skipEvaluatedRuleList.contains(otherRule)) {
+ continue;
}
- if (!rule.getEffectivePrefix().endsWith(OM_KEY_PREFIX)) {
- // if the prefix doesn't end with "/", then also search and
evaluate the directory itself
- // for example, "dir1/dir2" matches both directory "dir1/dir2" and
"dir1/dir22"
- // or "dir1" matches both directory "dir1" and "dir11"
- long objID;
- String objPrefix;
- String objPath;
- if (dirList.size() > 1) {
- OmDirectoryInfo secondLastDir = dirList.get(dirList.size() - 2);
- objID = secondLastDir.getObjectID();
- objPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX
+ bucket.getObjectID() +
- OM_KEY_PREFIX + secondLastDir.getObjectID();
- StringBuffer secondLastDirPath = new StringBuffer();
- for (int i = 0; i < dirList.size() - 1; i++) {
- secondLastDirPath.append(dirList.get(i).getName());
- if (i != dirList.size() - 2) {
- secondLastDirPath.append(OM_KEY_PREFIX);
- }
- }
- objPath = secondLastDirPath.toString();
- } else {
- objID = bucket.getObjectID();
- objPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX
+ bucket.getObjectID() +
- OM_KEY_PREFIX + bucket.getObjectID();
- objPath = "";
- }
- try {
- SubDirectorySummary subDirSummary = getSubDirectory(objID,
objPrefix, omMetadataManager);
- for (OmDirectoryInfo subDir : subDirSummary.getSubDirList()) {
- String subDirPath = objPath.isEmpty() ? subDir.getName() :
objPath + OM_KEY_PREFIX + subDir.getName();
- if (!subDir.getName().equals(TRASH_PREFIX) &&
subDirPath.startsWith(rule.getEffectivePrefix()) &&
- (lastDir == null || subDir.getObjectID() !=
lastDir.getObjectID())) {
- evaluateKeyAndDirTable(bucket, volume.getObjectID(),
keyTable, subDirPath, subDir,
- Arrays.asList(rule), expiredKeyList, expiredDirList);
- }
- }
- } catch (IOException e) {
- // log failure and continue the process
- LOG.warn("Failed to get sub directories of {} under {}/{}",
objPrefix,
- bucket.getVolumeName(), bucket.getBucketName(), e);
- return;
- }
+ DirectoryList otherDirList = effectiveRuleList.get(j).getDirList();
+ String otherPrefix = otherRule.getEffectivePrefix();
+ if (otherPrefix.startsWith(consolidatedPrefix)) {
+ LOG.info("Rule {}'s prefix {} is sub string of rule {}'s prefix
{}. " +
+ " Consolidate {} into {}.", otherRule.getId(),
otherPrefix, finalRuleIndexID,
+ consolidatedPrefix, otherRule.getId(), finalRuleIndexID);
+ consolidatedCandidate.addRule(otherRule);
+ skipEvaluatedRuleList.add(otherRule);
+ } else if (consolidatedPrefix.startsWith(otherPrefix)) {
+ LOG.info("Rule {}'s prefix {} is sub string of rule {}'s prefix
{}. Consolidate {} int {}. ",
+ consolidatedPrefix, consolidatedPrefix, otherRule.getId(),
otherPrefix, consolidatedPrefix,
+ otherRule.getId());
+ consolidatedPrefix = otherPrefix;
+ finalRuleIndexID = otherRule.getId();
+ finalDirList = otherDirList;
+ consolidatedCandidate.addRule(otherRule);
+ skipEvaluatedRuleList.add(otherRule);
}
- } else {
- evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "",
null,
- Arrays.asList(rule), expiredKeyList, expiredDirList);
}
+
+ consolidatedCandidate.addRule(rule);
+ consolidatedCandidate.setDirList(finalDirList);
+ consolidatedCandidate.setConsolidatedPrefix(consolidatedPrefix);
+ consolidatedRules.add(consolidatedCandidate);
}
- if (!noPrefixRuleList.isEmpty()) {
- evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "",
null,
- noPrefixRuleList, expiredKeyList, expiredDirList);
+ // Sort the list of paths lexicographically.
+ // FSO Depth-First Search order evaluates directories in lexicographical
order
+ // (since it retrieves entries from RocksDB sorted by name within the
same parent).
+ // Standard string sort on logical paths separated by "/" perfectly
matches this DFS order.
+ List<RuleListWithDirectoryList> sortedConsolidatedRules =
+ consolidatedRules.stream().sorted(new
RuleListWithDirectoryListOrder()).collect(Collectors.toList());
+
+ LOG.info("Final consolidated rules: " +
+
sortedConsolidatedRules.stream().map(RuleListWithDirectoryList::toString).collect(Collectors.joining(",
")));
+ if (test) {
+ consolidatedRuleList = sortedConsolidatedRules;
}
+ return sortedConsolidatedRules;
+ }
+
+ private boolean canSkipDir(OmDirectoryInfo currentDir, String
currentDirTableKey, DirectoryList dirList) {
Review Comment:
Rescanning isn't a frequent operation, and the directory structure may
change, so exact matching isn't necessary.
This simplifies the logic considerably:
- Record only the directory names: `lastScannedDir = “dir3/dir6/dir8”`
- Comparison Based on Directory Names
- Starting from the root and comparing segment by segment, if one side is a
prefix of the other (ancestor/descendant relationship), do not skip; otherwise,
skip.
such as:
```java
private boolean canSkip(String currentDirPath, String lastScannedDir) {
if (lastScannedDir == null || currentDirPath.isEmpty()) {
return false;
}
String[] cur = currentDirPath.split(OM_KEY_PREFIX);
String[] last = lastScannedDir.split(OM_KEY_PREFIX);
int n = Math.min(cur.length, last.length);
for (int i = 0; i < n; i++) {
int cmp = cur[i].compareTo(last[i]);
if (cmp != 0) {
return cmp > 0; // first branch:current name => last name ->
skip
}
}
return false;
}
```
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -154,6 +165,22 @@ public KeyLifecycleService(OzoneManager ozoneManager,
OZONE_KEY_LIFECYCLE_SERVICE_ENABLED_DEFAULT));
this.moveToTrashEnabled = new
AtomicBoolean(conf.getBoolean(OZONE_KEY_LIFECYCLE_SERVICE_MOVE_TO_TRASH_ENABLED,
OZONE_KEY_LIFECYCLE_SERVICE_MOVE_TO_TRASH_ENABLED_DEFAULT));
+ this.stateSaveIntervalMs =
conf.getLong(OZONE_KEY_LIFECYCLE_SERVICE_STATE_SAVE_INTERVAL_MS,
+ OZONE_KEY_LIFECYCLE_SERVICE_STATE_SAVE_INTERVAL_MS_DEFAULT);
+ if (!test && stateSaveIntervalMs <= 0) {
+ LOG.warn("Illegal value {} for Property {}. Set {} to {}",
stateSaveIntervalMs,
+ OZONE_KEY_LIFECYCLE_SERVICE_STATE_SAVE_INTERVAL_MS,
OZONE_KEY_LIFECYCLE_SERVICE_STATE_SAVE_INTERVAL_MS,
+ OZONE_KEY_LIFECYCLE_SERVICE_STATE_SAVE_INTERVAL_MS_DEFAULT);
+ maxKeysProcessedPerState =
OZONE_KEY_LIFECYCLE_SERVICE_STATE_SAVE_INTERVAL_MS_DEFAULT;
Review Comment:
Should be `stateSaveIntervalMs`
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -721,53 +1025,106 @@ private SubDirectorySummary getSubDirectory(long
dirObjID, String prefix, OMMeta
continue;
}
if (dir.getParentObjectID() == dirObjID) {
- subDirList.addSubDir(dir);
+ subDirList.addSubDir(entry.getKey(), dir, dir.getName());
}
}
}
return subDirList;
}
+ private void flushAndSaveState(OmBucketInfo bucket,
LimitedExpiredObjectList expiredKeyList,
+ LimitedExpiredObjectList expiredDirList, OmLifecycleScanState.Builder
scanStateBuilder) {
+ boolean saved = false;
+ if (expiredKeyList != null && !expiredKeyList.isEmpty()) {
+ if (bucket.getBucketLayout() == OBJECT_STORE) {
+ sendDeleteKeysRequestAndClearList(bucket.getVolumeName(),
bucket.getBucketName(), expiredKeyList,
+ false, scanStateBuilder, false);
+ } else {
+ handleAndClearFullList(bucket, expiredKeyList, false,
scanStateBuilder, false);
+ }
+ saved = true;
+ }
+ if (expiredDirList != null && !expiredDirList.isEmpty()) {
+ if (bucket.getBucketLayout() != OBJECT_STORE) {
+ handleAndClearFullList(bucket, expiredDirList, true,
scanStateBuilder, false);
+ saved = true;
+ }
+ }
+ if (!saved) {
+ sendSaveScanStateRequest(scanStateBuilder, false);
+ }
+ lastStateSaveTime = Time.monotonicNow();
+ lastStateSaveKeyCount = numKeyIterated;
+ }
+
private void evaluateBucket(OmBucketInfo bucketInfo,
- Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList,
LimitedExpiredObjectList expiredKeyList) {
+ Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList,
LimitedExpiredObjectList expiredKeyList,
+ OmLifecycleScanState.Builder scanStateBuilder) {
String volumeName = bucketInfo.getVolumeName();
String bucketName = bucketInfo.getBucketName();
+ String bucketPrefix = omMetadataManager.getBucketKey(volumeName,
bucketName);
- // use bucket name as key iterator prefix
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyTblItr =
- keyTable.iterator(omMetadataManager.getBucketKey(volumeName,
bucketName))) {
Review Comment:
Is it possible to directly use
`keyTblItr.seek(scanStateBuilder.getLastScannedKey())` and then process it
according to the original logic?
A certain degree of repetition is acceptable, as the keyTable itself may
change too.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponse.java:
##########
@@ -35,26 +36,29 @@
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmLifecycleScanState;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
/**
* Response for DeleteKey request.
*/
-@CleanupTableInfo(cleanupTables = {KEY_TABLE, OPEN_KEY_TABLE, DELETED_TABLE,
BUCKET_TABLE})
+@CleanupTableInfo(cleanupTables = {KEY_TABLE, OPEN_KEY_TABLE, DELETED_TABLE,
BUCKET_TABLE, LIFECYCLE_SCAN_STATE_TABLE})
public class OMKeysDeleteResponse extends AbstractOMKeyDeleteResponse {
private List<OmKeyInfo> omKeyInfoList;
private OmBucketInfo omBucketInfo;
private Map<String, OmKeyInfo> openKeyInfoMap = new HashMap<>();
+ private OmLifecycleScanState scanState;
Review Comment:
Duplicate with parent scanState
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java:
##########
@@ -162,6 +164,17 @@ public OMClientResponse
validateAndUpdateCache(OzoneManager ozoneManager, Execut
validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
String volumeOwner = getVolumeOwner(omMetadataManager, volumeName);
+ if (sourceType == RequestSource.LIFECYCLE &&
deleteKeyRequest.hasScanState()) {
+ if (ozoneManager.getAclsEnabled()) {
Review Comment:
we should check ACL in preExecute
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]