the-other-tim-brown commented on code in PR #18159:
URL: https://github.com/apache/hudi/pull/18159#discussion_r2843554274


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -614,4 +660,299 @@ private boolean 
isFileGroupInPendingMajorOrMinorCompaction(HoodieFileGroup fg) {
   private boolean noSubsequentReplaceCommit(String earliestCommitToRetain, 
String partitionPath) {
     return 
!hoodieTable.getHoodieView().getReplacedFileGroupsAfterOrOn(earliestCommitToRetain,
 partitionPath).findAny().isPresent();
   }
+
+  private <R> List<CleanFileInfo> getBlobFilesToRemove(HoodieTable table, 
HoodieSchema schema, Option<HoodieSchema> requestedSchema,
+                                                       Option<List<FileSlice>> 
retainedFileSlices, Option<List<FileSlice>> removedFileSlices) {
+    if (requestedSchema.isEmpty()) {
+      // no blob columns, no blob files to clean
+      return Collections.emptyList();
+    }
+    // Skip if there are no removed file slices
+    if (removedFileSlices.isEmpty()) {
+      return Collections.emptyList();
+    }
+    // Validate there is at least one completed commit
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    Option<String> latestCommitTimeOpt = 
metaClient.getActiveTimeline().lastInstant().map(HoodieInstant::requestedTime);
+    if (!latestCommitTimeOpt.isPresent()) {
+      // no commits or blob files to clean
+      return Collections.emptyList();
+    }
+    Map<HoodieFileGroupId, List<FileSlice>> retainedFileSlicesByFileGroupId = 
retainedFileSlices.orElseThrow(() -> new HoodieException("Retained file slices 
must be set"))
+        .stream().collect(Collectors.groupingBy(FileSlice::getFileGroupId));
+    Map<HoodieFileGroupId, List<FileSlice>> removedFileSlicesByFileGroupId = 
removedFileSlices.orElseThrow(() -> new HoodieException("Removed file slices 
must be set"))
+        .stream().collect(Collectors.groupingBy(FileSlice::getFileGroupId));
+
+    return getFileSliceComparisonGroups(retainedFileSlicesByFileGroupId, 
removedFileSlicesByFileGroupId).stream().flatMap(group -> {
+      HoodieReaderContext<R> readerContext = ((ReaderContextFactory<R>) 
table.getContext().getReaderContextFactory(metaClient)).getContext();
+      RecordContext<R> recordContext = readerContext.getRecordContext();
+      // Iterate through the removed file slices with skip merging to find all 
the blob files that are referenced by the removed file slices.
+      TypedProperties properties = TypedProperties.copy(config.getProps());
+      properties.put(HoodieReaderConfig.MERGE_TYPE.key(), REALTIME_SKIP_MERGE);
+
+      Set<String> managedBlobFilePaths = group.getRemovedFileSlices().stream()
+          .flatMap(fileSlice -> {
+            HoodieFileGroupReader<R> reader = getHoodieFileGroupReader(schema, 
fileSlice, readerContext, metaClient, latestCommitTimeOpt, 
requestedSchema.get(), properties);
+            Set<String> managedBlobFilePathsInSlice = new HashSet<>();
+            try (ClosableIterator<R> recordItr = reader.getClosableIterator()) 
{
+              while (recordItr.hasNext()) {
+                R record = recordItr.next();
+                managedBlobFilePathsInSlice.addAll(getManagedBlobPaths(schema, 
record, recordContext));
+              }
+            } catch (IOException e) {
+              throw new HoodieIOException("Error reading records from file 
slice: " + fileSlice, e);
+            }
+            return managedBlobFilePathsInSlice.stream();
+          })
+          .collect(Collectors.toSet());
+      if (managedBlobFilePaths.isEmpty()) {
+        // no blob files referenced by the removed file slices, skip
+        return Stream.empty();
+      }
+      // Then iterate through the retained file slices with skip merging to 
find all the blob files that are still referenced by the retained file slices.
+      group.getRetainedFileSlices().forEach(fileSlice -> {
+        HoodieFileGroupReader<R> reader = getHoodieFileGroupReader(schema, 
fileSlice, readerContext, metaClient, latestCommitTimeOpt, 
requestedSchema.get(), properties);
+        try (ClosableIterator<R> recordItr = reader.getClosableIterator()) {
+          while (recordItr.hasNext()) {
+            R record = recordItr.next();
+            getManagedBlobPaths(schema, record, 
recordContext).forEach(managedBlobFilePaths::remove);
+            if (managedBlobFilePaths.isEmpty()) {
+              // all blob files referenced by the removed file slices are 
still referenced by the retained file slices, skip
+              break;
+            }
+          }
+        } catch (IOException e) {
+          throw new HoodieIOException("Error reading records from file slice: 
" + fileSlice, e);
+        }
+      });
+
+      // The remaining blob file paths in managedBlobFilePaths are the ones 
that can be removed.
+      return managedBlobFilePaths.stream().map(path -> new CleanFileInfo(path, 
false));
+    }).collect(Collectors.toList());
+  }
+
+  private <R> HoodieFileGroupReader<R> getHoodieFileGroupReader(HoodieSchema 
schema, FileSlice fileSlice, HoodieReaderContext<R> readerContext, 
HoodieTableMetaClient metaClient,
+                                                                Option<String> 
latestCommitTimeOpt, HoodieSchema requestedSchema, TypedProperties props) {
+    return HoodieFileGroupReader.<R>newBuilder()
+        .withReaderContext(readerContext)
+        .withHoodieTableMetaClient(metaClient)
+        .withLatestCommitTime(latestCommitTimeOpt.get())
+        .withFileSlice(fileSlice)
+        .withDataSchema(schema)
+        .withRequestedSchema(requestedSchema)
+        .withProps(props)
+        .build();
+  }
+
+  // TODO: How does this work for tables with global index?
+  /**
+   * Cleaning blob files requires that we inspect the contents of the retained 
and removed file slices to find the blob file references that can be removed.
+   * To optimize the process, we limit the comparisons required with the 
following grouping logic:
+   * 1) If a removed file slice has a corresponding retained file slice in the 
same file group, we will compare the removed file slice with the retained file 
slice(s) in the same file group.
+   * 2) If a removed file slice does not have a corresponding retained file 
slice in the same file group, this implies that there is a replace or 
clustering commit. This requires comparing the
+   *    removed file slice with all retained file slices that were created 
after the removed file slice's latest instant time, as these are the only 
retained file slices

Review Comment:
   I don't think there is a clean mapping available for this. If there is, then 
we can use that to avoid inspecting all the files written after the file group 
that is removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to