amogh-jahagirdar commented on code in PR #15006:
URL: https://github.com/apache/iceberg/pull/15006#discussion_r2678924288
##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1063,16 +1112,121 @@ private List<ManifestFile> newDeleteFilesAsManifests()
{
newDeleteFilesBySpec.forEach(
(specId, deleteFiles) -> {
PartitionSpec spec = ops().current().spec(specId);
+ if (foundDuplicateDVs) {
+ mergeDVsAndUpdateDeleteFiles(deleteFiles, spec);
+ }
+
+ // Update summaries for all delete files including eq. deletes for
this partition spec
+ deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec,
file));
+
List<ManifestFile> newDeleteManifests =
writeDeleteManifests(deleteFiles, spec);
cachedNewDeleteManifests.addAll(newDeleteManifests);
});
this.hasNewDeleteFiles = false;
+ this.foundDuplicateDVs = false;
}
return cachedNewDeleteManifests;
}
+ private void mergeDVsAndUpdateDeleteFiles(DeleteFileSet deleteFiles,
PartitionSpec spec) {
+ // Filter out DVs and group them by referenced data file
+ Map<String, List<DeleteFile>> dvsByReferencedLocation =
+ deleteFiles.stream()
+ .filter(ContentFileUtil::isDV)
+ .collect(
+ Collectors.toMap(
+ DeleteFile::referencedDataFile,
+ Lists::newArrayList,
+ (existingDVs, newDVs) -> {
+ existingDVs.addAll(newDVs);
+ return existingDVs;
+ }));
+
+ // Merge DVs
+ Map<String, List<DeleteFile>> dvsThatNeedMerging =
+ dvsByReferencedLocation.entrySet().stream()
+ .filter(e -> e.getValue().size() > 1)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ List<DeleteFile> newDVs = Lists.newArrayList();
+ Tasks.foreach(dvsThatNeedMerging.entrySet())
+ .executeWith(ThreadPools.getDeleteWorkerPool())
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .run(
+ dvsToMergeForDataFile ->
+ newDVs.add(
+ mergeAndWriteDV(
+ dvsToMergeForDataFile.getKey(),
dvsToMergeForDataFile.getValue(), spec)));
+
+ // Remove the merged DVs from the tracking set
+ for (List<DeleteFile> dvsThatWereMerged : dvsThatNeedMerging.values()) {
+ dvsThatWereMerged.forEach(deleteFiles::remove);
+ }
+
+ // Add the new DVs to the tracking set
+ deleteFiles.addAll(newDVs);
+ }
+
+ private DeleteFile mergeAndWriteDV(
+ String referencedDataFile, List<DeleteFile> dvs, PartitionSpec spec) {
+ DeleteFile firstDV = dvs.get(0);
+ PositionDeleteIndex positionDeleteIndex =
+ Deletes.readDV(firstDV, ops().io(), ops().encryption());
+ for (int i = 1; i < dvs.size(); i++) {
+ DeleteFile dv = dvs.get(i);
+ Preconditions.checkArgument(
+ Objects.equals(dv.dataSequenceNumber(),
firstDV.dataSequenceNumber()),
+ "Cannot merge duplicate added DVs when data sequence numbers are
different,"
+ + "expected all to be added with sequence %s, but got %s",
+ firstDV.dataSequenceNumber(),
+ dv.dataSequenceNumber());
+
+ Preconditions.checkArgument(
+ Objects.equals(dv.partition(), firstDV.partition()),
+ "Cannot merge duplicate added DVs when partition tuples are
different");
+ positionDeleteIndex.merge(Deletes.readDV(dvs.get(i), ops().io(),
ops().encryption()));
Review Comment:
Hm, I think in the context of merging within the same commit, that it's not
illegal.
If DV1 has positions 1, 2, and 3 set
And Dv2 for some reason has positions 3, 4 set
Since this is all within the same commit, it feels reasonable to me to just
produce 1, 2, 3, and 4.
I think where this would make a difference is if we wanted to do merging on
conflict resolution; in that case we do need to care that a concurrently added
DV doesn't delete the same position to ensure serializable guarantees.
But I would argue that's different than what we're talking about here which
is reconcilliation within the same commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]