ivandika3 commented on code in PR #10052:
URL: https://github.com/apache/ozone/pull/10052#discussion_r3058122036
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -743,6 +750,156 @@ private void evaluateBucket(OmBucketInfo bucketInfo,
}
}
+ /**
+ * Process AbortIncompleteMultipartUpload actions for incomplete multipart
uploads.
+ * Iterates through the multipartInfoTable and aborts uploads that match
the rule criteria
+ * and have exceeded the configured days after initiation.
+ *
+ * @param bucketInfo the bucket information
+ * @param ruleList list of lifecycle rules to evaluate
+ */
+ private void processMultipartUploads(OmBucketInfo bucketInfo,
List<OmLCRule> ruleList) {
+ // Filter rules that have AbortIncompleteMultipartUpload actions
+ List<OmLCRule> mpuRules = ruleList.stream()
+ .filter(r -> r.getAbortIncompleteMultipartUpload() != null)
+ .collect(Collectors.toList());
+
+ if (mpuRules.isEmpty()) {
+ return;
+ }
+
+ String volumeName = bucketInfo.getVolumeName();
+ String bucketName = bucketInfo.getBucketName();
+ String bucketPrefix = omMetadataManager.getMultipartKey(volumeName,
bucketName, "", "");
+
+ LOG.debug("Processing AbortIncompleteMultipartUpload actions for bucket
{}/{}", volumeName, bucketName);
+
+ List<OmMultipartUpload> expiredUploads = new ArrayList<>();
+
+ try (TableIterator<String, ? extends Table.KeyValue<String,
OmMultipartKeyInfo>> mpuIterator =
+
omMetadataManager.getMultipartInfoTable().iterator(bucketPrefix)) {
+ while (mpuIterator.hasNext()) {
+ if (!shouldRun()) {
Review Comment:
Let's add some similar metrics like `numKeyIterated` for abort incomplete
multipart upload case.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -743,6 +750,156 @@ private void evaluateBucket(OmBucketInfo bucketInfo,
}
}
+ /**
+ * Process AbortIncompleteMultipartUpload actions for incomplete multipart
uploads.
+ * Iterates through the multipartInfoTable and aborts uploads that match
the rule criteria
+ * and have exceeded the configured days after initiation.
+ *
+ * @param bucketInfo the bucket information
+ * @param ruleList list of lifecycle rules to evaluate
+ */
+ private void processMultipartUploads(OmBucketInfo bucketInfo,
List<OmLCRule> ruleList) {
+ // Filter rules that have AbortIncompleteMultipartUpload actions
+ List<OmLCRule> mpuRules = ruleList.stream()
+ .filter(r -> r.getAbortIncompleteMultipartUpload() != null)
+ .collect(Collectors.toList());
+
+ if (mpuRules.isEmpty()) {
+ return;
+ }
+
+ String volumeName = bucketInfo.getVolumeName();
+ String bucketName = bucketInfo.getBucketName();
+ String bucketPrefix = omMetadataManager.getMultipartKey(volumeName,
bucketName, "", "");
+
+ LOG.debug("Processing AbortIncompleteMultipartUpload actions for bucket
{}/{}", volumeName, bucketName);
+
+ List<OmMultipartUpload> expiredUploads = new ArrayList<>();
+
+ try (TableIterator<String, ? extends Table.KeyValue<String,
OmMultipartKeyInfo>> mpuIterator =
+
omMetadataManager.getMultipartInfoTable().iterator(bucketPrefix)) {
+ while (mpuIterator.hasNext()) {
+ if (!shouldRun()) {
+ LOG.info("KeyLifecycleService is suspended or disabled. " +
+ "Stopping multipart upload processing for bucket {}.",
bucketName);
+ return;
+ }
+
+ Table.KeyValue<String, OmMultipartKeyInfo> entry =
mpuIterator.next();
+ OmMultipartKeyInfo mpuKeyInfo = entry.getValue();
+
+ // Extract multipart upload information from the key
+ OmMultipartUpload upload = OmMultipartUpload.from(entry.getKey());
Review Comment:
Catch the `IllegalArgumentException` and skip the MPU.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -743,6 +750,156 @@ private void evaluateBucket(OmBucketInfo bucketInfo,
}
}
+ /**
+ * Process AbortIncompleteMultipartUpload actions for incomplete multipart
uploads.
+ * Iterates through the multipartInfoTable and aborts uploads that match
the rule criteria
+ * and have exceeded the configured days after initiation.
+ *
+ * @param bucketInfo the bucket information
+ * @param ruleList list of lifecycle rules to evaluate
+ */
+ private void processMultipartUploads(OmBucketInfo bucketInfo,
List<OmLCRule> ruleList) {
+ // Filter rules that have AbortIncompleteMultipartUpload actions
+ List<OmLCRule> mpuRules = ruleList.stream()
+ .filter(r -> r.getAbortIncompleteMultipartUpload() != null)
+ .collect(Collectors.toList());
+
+ if (mpuRules.isEmpty()) {
+ return;
+ }
+
+ String volumeName = bucketInfo.getVolumeName();
+ String bucketName = bucketInfo.getBucketName();
+ String bucketPrefix = omMetadataManager.getMultipartKey(volumeName,
bucketName, "", "");
+
+ LOG.debug("Processing AbortIncompleteMultipartUpload actions for bucket
{}/{}", volumeName, bucketName);
+
+ List<OmMultipartUpload> expiredUploads = new ArrayList<>();
+
+ try (TableIterator<String, ? extends Table.KeyValue<String,
OmMultipartKeyInfo>> mpuIterator =
+
omMetadataManager.getMultipartInfoTable().iterator(bucketPrefix)) {
+ while (mpuIterator.hasNext()) {
+ if (!shouldRun()) {
+ LOG.info("KeyLifecycleService is suspended or disabled. " +
+ "Stopping multipart upload processing for bucket {}.",
bucketName);
+ return;
+ }
+
+ Table.KeyValue<String, OmMultipartKeyInfo> entry =
mpuIterator.next();
+ OmMultipartKeyInfo mpuKeyInfo = entry.getValue();
+
+ // Extract multipart upload information from the key
+ OmMultipartUpload upload = OmMultipartUpload.from(entry.getKey());
+
upload.setCreationTime(Instant.ofEpochMilli(mpuKeyInfo.getCreationTime()));
+ String keyName = upload.getKeyName();
+
+ // Check each rule to see if this upload should be aborted
+ for (OmLCRule rule : mpuRules) {
+ if (shouldAbortUpload(upload, keyName, rule)) {
+ expiredUploads.add(upload);
Review Comment:
Let's try to break down the delete into multiple batch, see
`ozone.lifecycle.service.delete.batch-size` and its usage in the normal key
expiry flow (e.g. `LimitedExpiredObjectList`)
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -743,6 +750,156 @@ private void evaluateBucket(OmBucketInfo bucketInfo,
}
}
+ /**
+ * Process AbortIncompleteMultipartUpload actions for incomplete multipart
uploads.
+ * Iterates through the multipartInfoTable and aborts uploads that match
the rule criteria
+ * and have exceeded the configured days after initiation.
+ *
+ * @param bucketInfo the bucket information
+ * @param ruleList list of lifecycle rules to evaluate
+ */
+ private void processMultipartUploads(OmBucketInfo bucketInfo,
List<OmLCRule> ruleList) {
Review Comment:
Please add a test in `TestKeyLifecycleService` to test this behavior.
##########
hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationGet.java:
##########
@@ -83,6 +83,47 @@ public void testGetLifecycleConfiguration() throws Exception
{
lcc.getRules().get(0).getExpiration().getDays().intValue());
}
+ @Test
+ public void testGetLifecycleWithAbortIncompleteMultipartUpload() throws
Exception {
Review Comment:
Let's add or modify the test in `AbstractS3SDKV1Tests` and
`AbstractS3SDKV2Tests` to test the E2E flow.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -743,6 +750,156 @@ private void evaluateBucket(OmBucketInfo bucketInfo,
}
}
+ /**
+ * Process AbortIncompleteMultipartUpload actions for incomplete multipart
uploads.
+ * Iterates through the multipartInfoTable and aborts uploads that match
the rule criteria
+ * and have exceeded the configured days after initiation.
+ *
+ * @param bucketInfo the bucket information
+ * @param ruleList list of lifecycle rules to evaluate
+ */
+ private void processMultipartUploads(OmBucketInfo bucketInfo,
List<OmLCRule> ruleList) {
+ // Filter rules that have AbortIncompleteMultipartUpload actions
+ List<OmLCRule> mpuRules = ruleList.stream()
+ .filter(r -> r.getAbortIncompleteMultipartUpload() != null)
+ .collect(Collectors.toList());
+
+ if (mpuRules.isEmpty()) {
+ return;
+ }
+
+ String volumeName = bucketInfo.getVolumeName();
+ String bucketName = bucketInfo.getBucketName();
+ String bucketPrefix = omMetadataManager.getMultipartKey(volumeName,
bucketName, "", "");
+
+ LOG.debug("Processing AbortIncompleteMultipartUpload actions for bucket
{}/{}", volumeName, bucketName);
+
+ List<OmMultipartUpload> expiredUploads = new ArrayList<>();
+
+ try (TableIterator<String, ? extends Table.KeyValue<String,
OmMultipartKeyInfo>> mpuIterator =
+
omMetadataManager.getMultipartInfoTable().iterator(bucketPrefix)) {
+ while (mpuIterator.hasNext()) {
+ if (!shouldRun()) {
+ LOG.info("KeyLifecycleService is suspended or disabled. " +
+ "Stopping multipart upload processing for bucket {}.",
bucketName);
+ return;
+ }
+
+ Table.KeyValue<String, OmMultipartKeyInfo> entry =
mpuIterator.next();
+ OmMultipartKeyInfo mpuKeyInfo = entry.getValue();
+
+ // Extract multipart upload information from the key
+ OmMultipartUpload upload = OmMultipartUpload.from(entry.getKey());
+
upload.setCreationTime(Instant.ofEpochMilli(mpuKeyInfo.getCreationTime()));
+ String keyName = upload.getKeyName();
+
+ // Check each rule to see if this upload should be aborted
+ for (OmLCRule rule : mpuRules) {
+ if (shouldAbortUpload(upload, keyName, rule)) {
+ expiredUploads.add(upload);
+ LOG.debug("Multipart upload {}/{}/{} with uploadId {} will be
aborted",
+ volumeName, bucketName, keyName, upload.getUploadId());
+ break; // One rule match is enough
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to iterate multipartInfoTable for bucket {}/{}",
volumeName, bucketName, e);
+ return;
+ }
+
+ if (!expiredUploads.isEmpty()) {
+ LOG.info("{} expired multipart uploads found for bucket {}/{}",
+ expiredUploads.size(), volumeName, bucketName);
+ abortExpiredMultipartUploads(bucketInfo, expiredUploads);
+ }
+ }
+
+ /**
+ * Check if a multipart upload should be aborted based on the lifecycle
rule.
+ *
+ * @param upload the multipart upload information
+ * @param keyName the key name of the upload
+ * @param rule the lifecycle rule to evaluate against
+ * @return true if the upload should be aborted, false otherwise
+ */
+ private boolean shouldAbortUpload(OmMultipartUpload upload, String
keyName, OmLCRule rule) {
+ // Check if upload age exceeds the threshold
+ if (!rule.getAbortIncompleteMultipartUpload().shouldAbort(
+ upload.getCreationTime().toEpochMilli())) {
+ return false;
+ }
+
+ // Check prefix matching
+ String effectivePrefix = rule.getEffectivePrefix();
+ if (effectivePrefix != null && !keyName.startsWith(effectivePrefix)) {
+ return false;
+ }
+
+ // TODO: Add tag filtering support when multipart uploads support tags
Review Comment:
Multipart uploads should already support tagging so we can support tag based
filtering support.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]