the-other-tim-brown commented on code in PR #18159:
URL: https://github.com/apache/hudi/pull/18159#discussion_r2843554274
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -614,4 +660,299 @@ private boolean
isFileGroupInPendingMajorOrMinorCompaction(HoodieFileGroup fg) {
private boolean noSubsequentReplaceCommit(String earliestCommitToRetain,
String partitionPath) {
return
!hoodieTable.getHoodieView().getReplacedFileGroupsAfterOrOn(earliestCommitToRetain,
partitionPath).findAny().isPresent();
}
+
+ private <R> List<CleanFileInfo> getBlobFilesToRemove(HoodieTable table,
HoodieSchema schema, Option<HoodieSchema> requestedSchema,
+ Option<List<FileSlice>>
retainedFileSlices, Option<List<FileSlice>> removedFileSlices) {
+ if (requestedSchema.isEmpty()) {
+ // no blob columns, no blob files to clean
+ return Collections.emptyList();
+ }
+ // Skip if there are no removed file slices
+ if (removedFileSlices.isEmpty()) {
+ return Collections.emptyList();
+ }
+ // Validate there is at least one completed commit
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ Option<String> latestCommitTimeOpt =
metaClient.getActiveTimeline().lastInstant().map(HoodieInstant::requestedTime);
+ if (!latestCommitTimeOpt.isPresent()) {
+ // no commits or blob files to clean
+ return Collections.emptyList();
+ }
+ Map<HoodieFileGroupId, List<FileSlice>> retainedFileSlicesByFileGroupId =
retainedFileSlices.orElseThrow(() -> new HoodieException("Retained file slices
must be set"))
+ .stream().collect(Collectors.groupingBy(FileSlice::getFileGroupId));
+ Map<HoodieFileGroupId, List<FileSlice>> removedFileSlicesByFileGroupId =
removedFileSlices.orElseThrow(() -> new HoodieException("Removed file slices
must be set"))
+ .stream().collect(Collectors.groupingBy(FileSlice::getFileGroupId));
+
+ return getFileSliceComparisonGroups(retainedFileSlicesByFileGroupId,
removedFileSlicesByFileGroupId).stream().flatMap(group -> {
+ HoodieReaderContext<R> readerContext = ((ReaderContextFactory<R>)
table.getContext().getReaderContextFactory(metaClient)).getContext();
+ RecordContext<R> recordContext = readerContext.getRecordContext();
+ // Iterate through the removed file slices with skip merging to find all
the blob files that are referenced by the removed file slices.
+ TypedProperties properties = TypedProperties.copy(config.getProps());
+ properties.put(HoodieReaderConfig.MERGE_TYPE.key(), REALTIME_SKIP_MERGE);
+
+ Set<String> managedBlobFilePaths = group.getRemovedFileSlices().stream()
+ .flatMap(fileSlice -> {
+ HoodieFileGroupReader<R> reader = getHoodieFileGroupReader(schema,
fileSlice, readerContext, metaClient, latestCommitTimeOpt,
requestedSchema.get(), properties);
+ Set<String> managedBlobFilePathsInSlice = new HashSet<>();
+ try (ClosableIterator<R> recordItr = reader.getClosableIterator())
{
+ while (recordItr.hasNext()) {
+ R record = recordItr.next();
+ managedBlobFilePathsInSlice.addAll(getManagedBlobPaths(schema,
record, recordContext));
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Error reading records from file
slice: " + fileSlice, e);
+ }
+ return managedBlobFilePathsInSlice.stream();
+ })
+ .collect(Collectors.toSet());
+ if (managedBlobFilePaths.isEmpty()) {
+ // no blob files referenced by the removed file slices, skip
+ return Stream.empty();
+ }
+ // Then iterate through the retained file slices with skip merging to
find all the blob files that are still referenced by the retained file slices.
+ group.getRetainedFileSlices().forEach(fileSlice -> {
+ HoodieFileGroupReader<R> reader = getHoodieFileGroupReader(schema,
fileSlice, readerContext, metaClient, latestCommitTimeOpt,
requestedSchema.get(), properties);
+ try (ClosableIterator<R> recordItr = reader.getClosableIterator()) {
+ while (recordItr.hasNext()) {
+ R record = recordItr.next();
+ getManagedBlobPaths(schema, record,
recordContext).forEach(managedBlobFilePaths::remove);
+ if (managedBlobFilePaths.isEmpty()) {
+ // all blob files referenced by the removed file slices are
still referenced by the retained file slices, skip
+ break;
+ }
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Error reading records from file slice:
" + fileSlice, e);
+ }
+ });
+
+ // The remaining blob file paths in managedBlobFilePaths are the ones
that can be removed.
+ return managedBlobFilePaths.stream().map(path -> new CleanFileInfo(path,
false));
+ }).collect(Collectors.toList());
+ }
+
+ private <R> HoodieFileGroupReader<R> getHoodieFileGroupReader(HoodieSchema
schema, FileSlice fileSlice, HoodieReaderContext<R> readerContext,
HoodieTableMetaClient metaClient,
+ Option<String>
latestCommitTimeOpt, HoodieSchema requestedSchema, TypedProperties props) {
+ return HoodieFileGroupReader.<R>newBuilder()
+ .withReaderContext(readerContext)
+ .withHoodieTableMetaClient(metaClient)
+ .withLatestCommitTime(latestCommitTimeOpt.get())
+ .withFileSlice(fileSlice)
+ .withDataSchema(schema)
+ .withRequestedSchema(requestedSchema)
+ .withProps(props)
+ .build();
+ }
+
+ // TODO: How does this work for tables with global index?
+ /**
+ * Cleaning blob files requires that we inspect the contents of the retained
and removed file slices to find the blob file references that can be removed.
+ * To optimize the process, we limit the comparisons required with the
following grouping logic:
+ * 1) If a removed file slice has a corresponding retained file slice in the
same file group, we will compare the removed file slice with the retained file
slice(s) in the same file group.
+ * 2) If a removed file slice does not have a corresponding retained file
slice in the same file group, this implies that there is a replace or
clustering commit. This requires comparing the
+ * removed file slice with all retained file slices that were created
after the removed file slice's latest instant time, as these are the only
retained file slices
Review Comment:
I don't think there is a clean mapping available for this. If there is, then
we can use that to avoid inspecting all the files written after the file group
that is removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]