nastra commented on code in PR #15006:
URL: https://github.com/apache/iceberg/pull/15006#discussion_r2698498262
##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1073,6 +1088,136 @@ private List<ManifestFile> newDeleteFilesAsManifests() {
return cachedNewDeleteManifests;
}
+ // Merge duplicates, internally takes care of updating newDeleteFilesBySpec
to remove
+ // duplicates and add the newly merged DV
+ private void mergeDVsAndWrite() {
+ Map<Integer, List<MergedDVContent>> mergedIndicesBySpec =
Maps.newConcurrentMap();
+ Map<String, DeleteFileSet> dataFilesWithDuplicateDVs =
+ dvsByReferencedFile.entrySet().stream()
+ .filter(entry -> entry.getValue().size() > 1)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ Tasks.foreach(dataFilesWithDuplicateDVs.entrySet())
+ .executeWith(ThreadPools.getDeleteWorkerPool())
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .run(
+ entry -> {
+ String referencedLocation = entry.getKey();
+ DeleteFileSet duplicateDVs = entry.getValue();
+ MergedDVContent merged = mergePositions(referencedLocation,
duplicateDVs);
+ mergedIndicesBySpec
+ .computeIfAbsent(
+ merged.specId, spec ->
Collections.synchronizedList(Lists.newArrayList()))
+ .add(merged);
+ });
+
+ // Update newDeleteFilesBySpec to remove all the duplicates
+ mergedIndicesBySpec.forEach(
+ (specId, mergedDVContent) -> {
+ mergedDVContent.stream()
+ .map(content -> content.duplicateDVs)
+ .forEach(duplicateDVs ->
newDeleteFilesBySpec.get(specId).removeAll(duplicateDVs));
+ });
+
+ writeMergedDVs(mergedIndicesBySpec);
+ }
+
+ // Produces a Puffin per partition spec containing the merged DVs for that
spec
+ private void writeMergedDVs(Map<Integer, List<MergedDVContent>>
mergedDVContentBySpec) {
+ Map<Integer, DeleteFileSet> mergedDVsBySpec = Maps.newHashMap();
+
+ mergedDVContentBySpec.forEach(
+ (specId, mergedDVsForSpec) -> {
+ try (DVFileWriter dvFileWriter =
+ new BaseDVFileWriter(
+ OutputFileFactory.builderFor(ops(), spec(specId),
FileFormat.PUFFIN, 1, 1)
+ .build(),
+ path -> null)) {
+
+ for (MergedDVContent mergedDV : mergedDVsForSpec) {
+ LOG.warn(
+ "Merged {} duplicate deletion vectors for data file {} in
table {}. The duplicate DVs are orphaned, and writers should merge DVs per file
before committing",
+ mergedDV.duplicateDVs.size(),
+ mergedDV.referencedLocation,
+ tableName);
+ dvFileWriter.delete(
+ mergedDV.referencedLocation,
+ mergedDV.mergedPositions,
+ spec(mergedDV.specId),
+ mergedDV.partition);
+ }
+
+ dvFileWriter.close();
+ DeleteWriteResult result = dvFileWriter.result();
+
+ DeleteFileSet dvsForSpec =
+ mergedDVsBySpec.computeIfAbsent(specId, k ->
DeleteFileSet.create());
+ dvsForSpec.addAll(
+ result.deleteFiles().stream()
+ .map(file -> Delegates.pendingDeleteFile(file,
file.dataSequenceNumber()))
+ .collect(Collectors.toList()));
+
+ // Add the merged DV to the delete files by spec
+ newDeleteFilesBySpec.get(specId).addAll(dvsForSpec);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ }
+
+ // Data class for referenced file, DVs that were merged, the merged position
delete index,
+ // partition spec and tuple
+ private static class MergedDVContent {
+ private DeleteFileSet duplicateDVs;
+ private String referencedLocation;
+ private PositionDeleteIndex mergedPositions;
+ private int specId;
+ private StructLike partition;
+
+ MergedDVContent(
+ String referencedLocation,
+ DeleteFileSet duplicateDVs,
+ PositionDeleteIndex mergedPositions,
+ int specId,
+ StructLike partition) {
+ this.referencedLocation = referencedLocation;
+ this.duplicateDVs = duplicateDVs;
+ this.mergedPositions = mergedPositions;
+ this.specId = specId;
+ this.partition = partition;
+ }
+ }
+
+ // Merges the position indices for the duplicate DVs for a given referenced
file
+ private MergedDVContent mergePositions(String referencedLocation,
DeleteFileSet duplicateDVs) {
+ Iterator<DeleteFile> dvIterator = duplicateDVs.iterator();
+ DeleteFile firstDV = dvIterator.next();
+ PositionDeleteIndex mergedPositions = Deletes.readDV(firstDV, ops().io(),
ops().encryption());
+ while (dvIterator.hasNext()) {
+ DeleteFile dv = dvIterator.next();
+ Preconditions.checkArgument(
+ Objects.equals(dv.dataSequenceNumber(),
firstDV.dataSequenceNumber()),
+ "Cannot merge duplicate added DVs when data sequence numbers are
different,"
Review Comment:
```suggestion
"Cannot merge duplicate added DVs when data sequence numbers are
different, "
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]