the-other-tim-brown commented on code in PR #18159:
URL: https://github.com/apache/hudi/pull/18159#discussion_r2843878532
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -614,4 +660,299 @@ private boolean
isFileGroupInPendingMajorOrMinorCompaction(HoodieFileGroup fg) {
private boolean noSubsequentReplaceCommit(String earliestCommitToRetain,
String partitionPath) {
return
!hoodieTable.getHoodieView().getReplacedFileGroupsAfterOrOn(earliestCommitToRetain,
partitionPath).findAny().isPresent();
}
+
+ private <R> List<CleanFileInfo> getBlobFilesToRemove(HoodieTable table,
HoodieSchema schema, Option<HoodieSchema> requestedSchema,
+ Option<List<FileSlice>>
retainedFileSlices, Option<List<FileSlice>> removedFileSlices) {
+ if (requestedSchema.isEmpty()) {
+ // no blob columns, no blob files to clean
+ return Collections.emptyList();
+ }
+ // Skip if there are no removed file slices
+ if (removedFileSlices.isEmpty()) {
+ return Collections.emptyList();
+ }
+ // Validate there is at least one completed commit
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ Option<String> latestCommitTimeOpt =
metaClient.getActiveTimeline().lastInstant().map(HoodieInstant::requestedTime);
+ if (!latestCommitTimeOpt.isPresent()) {
+ // no commits or blob files to clean
+ return Collections.emptyList();
+ }
+ Map<HoodieFileGroupId, List<FileSlice>> retainedFileSlicesByFileGroupId =
retainedFileSlices.orElseThrow(() -> new HoodieException("Retained file slices
must be set"))
+ .stream().collect(Collectors.groupingBy(FileSlice::getFileGroupId));
+ Map<HoodieFileGroupId, List<FileSlice>> removedFileSlicesByFileGroupId =
removedFileSlices.orElseThrow(() -> new HoodieException("Removed file slices
must be set"))
+ .stream().collect(Collectors.groupingBy(FileSlice::getFileGroupId));
+
+ return getFileSliceComparisonGroups(retainedFileSlicesByFileGroupId,
removedFileSlicesByFileGroupId).stream().flatMap(group -> {
+ HoodieReaderContext<R> readerContext = ((ReaderContextFactory<R>)
table.getContext().getReaderContextFactory(metaClient)).getContext();
+ RecordContext<R> recordContext = readerContext.getRecordContext();
+ // Iterate through the removed file slices with skip merging to find all
the blob files that are referenced by the removed file slices.
+ TypedProperties properties = TypedProperties.copy(config.getProps());
+ properties.put(HoodieReaderConfig.MERGE_TYPE.key(), REALTIME_SKIP_MERGE);
+
+ Set<String> managedBlobFilePaths = group.getRemovedFileSlices().stream()
+ .flatMap(fileSlice -> {
+ HoodieFileGroupReader<R> reader = getHoodieFileGroupReader(schema,
fileSlice, readerContext, metaClient, latestCommitTimeOpt,
requestedSchema.get(), properties);
+ Set<String> managedBlobFilePathsInSlice = new HashSet<>();
+ try (ClosableIterator<R> recordItr = reader.getClosableIterator())
{
+ while (recordItr.hasNext()) {
+ R record = recordItr.next();
+ managedBlobFilePathsInSlice.addAll(getManagedBlobPaths(schema,
record, recordContext));
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Error reading records from file
slice: " + fileSlice, e);
+ }
+ return managedBlobFilePathsInSlice.stream();
+ })
+ .collect(Collectors.toSet());
+ if (managedBlobFilePaths.isEmpty()) {
+ // no blob files referenced by the removed file slices, skip
+ return Stream.empty();
+ }
+ // Then iterate through the retained file slices with skip merging to
find all the blob files that are still referenced by the retained file slices.
+ group.getRetainedFileSlices().forEach(fileSlice -> {
+ HoodieFileGroupReader<R> reader = getHoodieFileGroupReader(schema,
fileSlice, readerContext, metaClient, latestCommitTimeOpt,
requestedSchema.get(), properties);
+ try (ClosableIterator<R> recordItr = reader.getClosableIterator()) {
+ while (recordItr.hasNext()) {
+ R record = recordItr.next();
+ getManagedBlobPaths(schema, record,
recordContext).forEach(managedBlobFilePaths::remove);
+ if (managedBlobFilePaths.isEmpty()) {
+ // all blob files referenced by the removed file slices are
still referenced by the retained file slices, skip
+ break;
+ }
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Error reading records from file slice:
" + fileSlice, e);
+ }
+ });
+
+ // The remaining blob file paths in managedBlobFilePaths are the ones
that can be removed.
+ return managedBlobFilePaths.stream().map(path -> new CleanFileInfo(path,
false));
+ }).collect(Collectors.toList());
+ }
+
+ private <R> HoodieFileGroupReader<R> getHoodieFileGroupReader(HoodieSchema
schema, FileSlice fileSlice, HoodieReaderContext<R> readerContext,
HoodieTableMetaClient metaClient,
+ Option<String>
latestCommitTimeOpt, HoodieSchema requestedSchema, TypedProperties props) {
+ return HoodieFileGroupReader.<R>newBuilder()
+ .withReaderContext(readerContext)
+ .withHoodieTableMetaClient(metaClient)
+ .withLatestCommitTime(latestCommitTimeOpt.get())
+ .withFileSlice(fileSlice)
+ .withDataSchema(schema)
+ .withRequestedSchema(requestedSchema)
+ .withProps(props)
+ .build();
+ }
+
+ // TODO: How does this work for tables with global index?
+ /**
+ * Cleaning blob files requires that we inspect the contents of the retained
and removed file slices to find the blob file references that can be removed.
+ * To optimize the process, we limit the comparisons required with the
following grouping logic:
+ * 1) If a removed file slice has a corresponding retained file slice in the
same file group, we will compare the removed file slice with the retained file
slice(s) in the same file group.
+ * 2) If a removed file slice does not have a corresponding retained file
slice in the same file group, this implies that there is a replace or
clustering commit. This requires comparing the
+ * removed file slice with all retained file slices that were created
after the removed file slice's latest instant time, as these are the only
retained file slices
+ * that could be part of a replace or clustering commit that removes
these files from the active view of the table. All slices that fall into this
case are put into the same comparison group to
+ * avoid reading the same file multiple times.
+ * @param retainedFileSlicesByFileGroupId the map of retained file slices
grouped by HoodieFileGroupId
+ * @param removedFileSlicesByFileGroupId the map of removed file slices
grouped by HoodieFileGroupId
+ * @return the list of FileSliceComparisonGroup which contains the groups of
file slices to compare for finding dereferenced blobs
+ */
+ @VisibleForTesting
+ static List<FileSliceComparisonGroup>
getFileSliceComparisonGroups(Map<HoodieFileGroupId, List<FileSlice>>
retainedFileSlicesByFileGroupId,
+
Map<HoodieFileGroupId, List<FileSlice>> removedFileSlicesByFileGroupId) {
+ List<FileSliceComparisonGroup> groupings = new ArrayList<>();
+ List<List<FileSlice>> removedFileSlicesWithoutRetainedSlices = new
ArrayList<>();
+ removedFileSlicesByFileGroupId.keySet().forEach(fileGroupId -> {
+ List<FileSlice> removedSlices =
removedFileSlicesByFileGroupId.get(fileGroupId);
+ List<FileSlice> retainedSlices =
retainedFileSlicesByFileGroupId.get(fileGroupId);
+ if (retainedSlices == null) {
+ // This is due to replace commit or clustering so we must handle this
case separately
+ removedFileSlicesWithoutRetainedSlices.add(removedSlices);
+ } else {
+ groupings.add(FileSliceComparisonGroup.builder().removedFileSlices(new
HashSet<>(removedSlices)).retainedFileSlices(new
HashSet<>(retainedSlices)).build());
+ }
+ });
+ if (!removedFileSlicesWithoutRetainedSlices.isEmpty()) {
+ // File slices that do not have a retained commit are due to replace
commit or clustering, so we will compare them against all retained file slices
created after the oldest removed file slice.
+ String instant = removedFileSlicesWithoutRetainedSlices.stream()
+ // for each file group we get the latest instant time
+ .map(fileSlices ->
fileSlices.stream().map(FileSlice::getLatestInstantTime).reduce(InstantComparison::maxInstant)
+ .orElseThrow(() -> new HoodieException("File slices should have
at least one file")))
+ .reduce(InstantComparison::minInstant).orElseThrow(() -> new
HoodieException("There should be at least one file slice to remove"));
+ // find the file groups created after this instant and get the retained
file slices for those file groups
+ Set<FileSlice> retainedFileSlicesRequiringComparison =
retainedFileSlicesByFileGroupId.entrySet().stream()
+ .filter(entry -> entry.getValue().stream().allMatch(fileSlice ->
compareTimestamps(fileSlice.getBaseInstantTime(), GREATER_THAN, instant)))
Review Comment:
The base instant time is used since this is to handle file groups that are
potentially created by the replace commit that removed the other file group.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]