ivandika3 commented on code in PR #10052:
URL: https://github.com/apache/ozone/pull/10052#discussion_r3142979078


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java:
##########
@@ -743,6 +753,208 @@ private void evaluateBucket(OmBucketInfo bucketInfo,
       }
     }
 
+    /**
+     * Process AbortIncompleteMultipartUpload actions for incomplete multipart 
uploads.
+     * Iterates through the multipartInfoTable and aborts uploads that match 
the rule criteria
+     * and have exceeded the configured days after initiation.
+     *
+     * @param bucketInfo the bucket information
+     * @param ruleList list of lifecycle rules to evaluate
+     */
+    private void processMultipartUploads(OmBucketInfo bucketInfo, 
List<OmLCRule> ruleList) {
+      // Filter rules that have AbortIncompleteMultipartUpload actions
+      List<OmLCRule> mpuRules = ruleList.stream()
+          .filter(r -> r.getAbortIncompleteMultipartUpload() != null)
+          .collect(Collectors.toList());
+
+      if (mpuRules.isEmpty()) {
+        return;
+      }
+
+      String volumeName = bucketInfo.getVolumeName();
+      String bucketName = bucketInfo.getBucketName();
+      String bucketPrefix = omMetadataManager.getMultipartKey(volumeName, 
bucketName, "", "");
+
+      LOG.debug("Processing AbortIncompleteMultipartUpload actions for bucket 
{}/{}", volumeName, bucketName);
+
+      LimitedSizeList<OmMultipartUpload> expiredUploads = new 
LimitedSizeList<>(listMaxSize);
+
+      try (TableIterator<String, ? extends Table.KeyValue<String, 
OmMultipartKeyInfo>> mpuIterator =
+               
omMetadataManager.getMultipartInfoTable().iterator(bucketPrefix)) {
+        while (mpuIterator.hasNext()) {
+          if (!shouldRun()) {
+            LOG.info("KeyLifecycleService is suspended or disabled. " +
+                "Stopping multipart upload processing for bucket {}.", 
bucketName);
+            return;
+          }
+
+          Table.KeyValue<String, OmMultipartKeyInfo> entry = 
mpuIterator.next();
+          OmMultipartKeyInfo mpuKeyInfo = entry.getValue();
+          numMultipartUploadIterated++;
+
+          // Extract multipart upload information from the key
+          OmMultipartUpload upload;
+          try {
+            upload = OmMultipartUpload.from(entry.getKey());
+          } catch (IllegalArgumentException e) {
+            LOG.warn("Failed to parse multipart upload key {} in bucket {}/{}, 
skipping",
+                entry.getKey(), volumeName, bucketName, e);
+            continue;
+          }
+
+          
upload.setCreationTime(Instant.ofEpochMilli(mpuKeyInfo.getCreationTime()));
+          String keyName = upload.getKeyName();
+
+          // Get the open key to access tags
+          String multipartOpenKey;
+          try {
+            multipartOpenKey = OMMultipartUploadUtils.getMultipartOpenKey(
+                volumeName, bucketName, keyName, upload.getUploadId(),
+                omMetadataManager, bucketInfo.getBucketLayout());
+          } catch (OMException e) {
+            LOG.warn("Failed to get multipart open key for {}/{}/{}, skipping",
+                volumeName, bucketName, keyName, e);
+            continue;
+          }
+
+          OmKeyInfo openKeyInfo = 
omMetadataManager.getOpenKeyTable(bucketInfo.getBucketLayout())
+              .get(multipartOpenKey);
+          if (openKeyInfo == null) {
+            LOG.warn("Open key not found for multipart upload {}/{}/{}, 
skipping",
+                volumeName, bucketName, keyName);
+            continue;
+          }
+
+          // Check each rule to see if this upload should be aborted
+          for (OmLCRule rule : mpuRules) {
+            if (shouldAbortUpload(openKeyInfo, upload, keyName, rule)) {
+              if (expiredUploads.isFull()) {
+                LOG.info("Multipart upload list reached batch size {}, 
aborting current batch for bucket {}/{}",
+                    listMaxSize, volumeName, bucketName);
+                abortExpiredMultipartUploadsAndClear(bucketInfo, 
expiredUploads);
+              }
+
+              expiredUploads.add(upload);
+              LOG.debug("Multipart upload {}/{}/{} with uploadId {} will be 
aborted",
+                  volumeName, bucketName, keyName, upload.getUploadId());
+              break;  // One rule match is enough
+            }
+          }
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to iterate multipartInfoTable for bucket {}/{}", 
volumeName, bucketName, e);
+        return;
+      }
+
+      if (!expiredUploads.isEmpty()) {
+        LOG.info("{} expired multipart uploads remaining for bucket {}/{}",
+            expiredUploads.size(), volumeName, bucketName);
+        abortExpiredMultipartUploadsAndClear(bucketInfo, expiredUploads);
+      }
+    }
+
+    /**
+     * Check if a multipart upload should be aborted based on the lifecycle 
rule.
+     *
+     * @param openKeyInfo the open key information with tags
+     * @param upload the multipart upload information
+     * @param keyName the key name of the upload
+     * @param rule the lifecycle rule to evaluate against
+     * @return true if the upload should be aborted, false otherwise
+     */
+    private boolean shouldAbortUpload(OmKeyInfo openKeyInfo, OmMultipartUpload 
upload,
+                                      String keyName, OmLCRule rule) {
+      // Check if upload age exceeds the threshold
+      if (!rule.getAbortIncompleteMultipartUpload().shouldAbort(
+          upload.getCreationTime().toEpochMilli())) {
+        return false;
+      }
+
+      // Check prefix and tag filtering using the existing rule.match() logic
+      // The rule.match() method handles both prefix and tag filtering
+      if (!rule.match(openKeyInfo, keyName)) {
+        return false;  // Prefix or tag doesn't match
+      }

Review Comment:
   This does not seem correct, `OmLCRule#match` tests on the expiration instead 
so this also returns false when testing on MPU abort. Please add tests on the 
prefix and tag matching.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to