amogh-jahagirdar commented on code in PR #15006:
URL: https://github.com/apache/iceberg/pull/15006#discussion_r2680296133
##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1060,19 +1086,119 @@ private List<ManifestFile> newDeleteFilesAsManifests()
{
}
if (cachedNewDeleteManifests.isEmpty()) {
+ // Found duplicates, merge them and update newDeleteFilesBySpec to
remove duplicates and add
+ // the new merged one
+ if (!duplicateDVsForDataFile.isEmpty()) {
+ Map<String, DeleteFile> mergedDVs = mergeDuplicateDVs();
+ for (Map.Entry<String, DeleteFile> mergedDV : mergedDVs.entrySet()) {
+ String referencedFile = mergedDV.getKey();
+ DeleteFile newDV = mergedDV.getValue();
+ DeleteFileSet duplicateDVs =
duplicateDVsForDataFile.get(referencedFile);
+ DeleteFileSet allDeleteFilesForSpec =
newDeleteFilesBySpec.get(newDV.specId());
+ allDeleteFilesForSpec.removeAll(duplicateDVs);
+ allDeleteFilesForSpec.add(newDV);
+ }
+ }
+
newDeleteFilesBySpec.forEach(
(specId, deleteFiles) -> {
PartitionSpec spec = ops().current().spec(specId);
+ // Update summaries for all added delete files including eq.
deletes for this partition
+ // spec
+ deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec,
file));
List<ManifestFile> newDeleteManifests =
writeDeleteManifests(deleteFiles, spec);
cachedNewDeleteManifests.addAll(newDeleteManifests);
});
this.hasNewDeleteFiles = false;
+ this.duplicateDVsForDataFile.clear();
}
return cachedNewDeleteManifests;
}
+ // Find duplicate DVs for a given partition spec, and return a Pair of the
new DVs and the DVs
+ // that were merged
+ private Map<String, DeleteFile> mergeDuplicateDVs() {
+ Map<String, DeleteFile> mergedDVs = Maps.newConcurrentMap();
+ Tasks.foreach(duplicateDVsForDataFile.entrySet())
+ .executeWith(ThreadPools.getDeleteWorkerPool())
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .run(
+ dvsToMergeForDataFile -> {
+ String referencedLocation = dvsToMergeForDataFile.getKey();
+ mergedDVs.put(
+ referencedLocation,
+ mergeAndWriteDV(referencedLocation,
dvsToMergeForDataFile.getValue()));
+ });
+
+ return mergedDVs;
+ }
+
+ private DeleteFile mergeAndWriteDV(String referencedDataFile, DeleteFileSet
dvs) {
+ Iterator<DeleteFile> dvIterator = dvs.iterator();
+ DeleteFile firstDV = dvIterator.next();
+ PositionDeleteIndex positionDeleteIndex =
+ Deletes.readDV(firstDV, ops().io(), ops().encryption());
+ PartitionSpec spec = spec(firstDV.specId());
+ while (dvIterator.hasNext()) {
+ DeleteFile dv = dvIterator.next();
+ Preconditions.checkArgument(
+ Objects.equals(dv.dataSequenceNumber(),
firstDV.dataSequenceNumber()),
+ "Cannot merge duplicate added DVs when data sequence numbers are
different,"
+ + "expected all to be added with sequence %s, but got %s",
+ firstDV.dataSequenceNumber(),
+ dv.dataSequenceNumber());
+ Preconditions.checkArgument(
+ dv.specId() == firstDV.specId(),
+ "Cannot merge duplicate added DVs when partition specs are
different,"
+ + "expected all to be added with spec %s, but got %s",
+ firstDV.specId(),
+ dv.specId());
+ Preconditions.checkArgument(
+ Objects.equals(dv.partition(), firstDV.partition()),
+ "Cannot merge duplicate added DVs when partition tuples are
different");
+ positionDeleteIndex.merge(Deletes.readDV(dv, ops().io(),
ops().encryption()));
+ }
+
+ return writeDV(
+ referencedDataFile,
+ positionDeleteIndex,
+ spec,
+ firstDV.partition(),
+ firstDV.dataSequenceNumber());
+ }
+
+ private DeleteFile writeDV(
Review Comment:
I take back what I said, I'll leave this as is now unless there's some
strong opinions here, it's a classic tradeoff between parallelism and file
sizes.
If we work from the assumption that there won't be too many data files with
duplicate DVs, then there won't be too many small puffins created. From a
metadata perspective it doesn't really matter because it's the same number of
entries in delete manifests after merging, but this is mainly just from a
storage I/O cost perspective. But again, if there's not too many data files
with duplicates, this is probably negligible in terms of costs.
Trying to coalesce all the DVs into a single puffin file just complicates
the logic even more and
personally, I'd prefer to keep the logic simpler until we know it's really a
problem. In the end we could run Rewrite position deletes and better colocate
the blobs, and again considering this fix up is really not expected to run in
the average case, doesn't seem worth it to complicate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]