amogh-jahagirdar commented on code in PR #15006:
URL: https://github.com/apache/iceberg/pull/15006#discussion_r2688043370


##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1060,19 +1096,113 @@ private List<ManifestFile> newDeleteFilesAsManifests() 
{
     }
 
     if (cachedNewDeleteManifests.isEmpty()) {
+      // Found duplicates, merge them and update newDeleteFilesBySpec to 
remove duplicates and add
+      // the new merged one
+      if (!duplicateDVsForDataFile.isEmpty()) {
+        Map<String, DeleteFile> mergedDVs = mergeDuplicateDVs();
+        mergedDVs.forEach(
+            (referencedFile, newDV) -> {
+              DeleteFileSet duplicateDVs = 
duplicateDVsForDataFile.get(referencedFile);
+              DeleteFileSet allDeleteFilesForSpec = 
newDeleteFilesBySpec.get(newDV.specId());
+              LOG.warn(
+                  "Merged {} duplicate deletion vectors for data file {} in 
table {}. The merged DVs are orphaned, and writers should merge DVs per file 
before committing",
+                  duplicateDVs.size(),
+                  referencedFile,
+                  tableName);
+              allDeleteFilesForSpec.removeAll(duplicateDVs);
+              allDeleteFilesForSpec.add(newDV);
+            });
+      }
+
       newDeleteFilesBySpec.forEach(
           (specId, deleteFiles) -> {
             PartitionSpec spec = ops().current().spec(specId);
+            deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec, 
file));
             List<ManifestFile> newDeleteManifests = 
writeDeleteManifests(deleteFiles, spec);
             cachedNewDeleteManifests.addAll(newDeleteManifests);
           });
 
       this.hasNewDeleteFiles = false;
+      this.duplicateDVsForDataFile.clear();
     }
 
     return cachedNewDeleteManifests;
   }
 
+  // Returns the merged DV for a given data file that had duplicate DVs
+  private Map<String, DeleteFile> mergeDuplicateDVs() {
+    Map<String, DeleteFile> mergedDVs = Maps.newConcurrentMap();
+    Tasks.foreach(duplicateDVsForDataFile.entrySet())
+        .executeWith(ThreadPools.getDeleteWorkerPool())
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .run(
+            dvsToMergeForDataFile -> {
+              String referencedLocation = dvsToMergeForDataFile.getKey();
+              mergedDVs.put(
+                  referencedLocation,
+                  mergeAndWriteDV(referencedLocation, 
dvsToMergeForDataFile.getValue()));
+            });
+
+    return mergedDVs;
+  }
+
+  // Merges the set of DVs for a given referenced files into a single DV
+  // and produces a single Puffin file
+  private DeleteFile mergeAndWriteDV(String referencedDataFile, DeleteFileSet 
dvs) {
+    Iterator<DeleteFile> dvIterator = dvs.iterator();
+    DeleteFile firstDV = dvIterator.next();
+    PositionDeleteIndex positionDeleteIndex =
+        Deletes.readDV(firstDV, ops().io(), ops().encryption());
+    PartitionSpec spec = spec(firstDV.specId());
+    while (dvIterator.hasNext()) {
+      DeleteFile dv = dvIterator.next();
+      Preconditions.checkArgument(
+          Objects.equals(dv.dataSequenceNumber(), 
firstDV.dataSequenceNumber()),
+          "Cannot merge duplicate added DVs when data sequence numbers are 
different,"
+              + "expected all to be added with sequence %s, but got %s",
+          firstDV.dataSequenceNumber(),
+          dv.dataSequenceNumber());
+      Preconditions.checkArgument(
+          dv.specId() == firstDV.specId(),
+          "Cannot merge duplicate added DVs when partition specs are 
different,"
+              + "expected all to be added with spec %s, but got %s",
+          firstDV.specId(),
+          dv.specId());
+      Preconditions.checkArgument(
+          Objects.equals(dv.partition(), firstDV.partition()),
+          "Cannot merge duplicate added DVs when partition tuples are 
different");
+      positionDeleteIndex.merge(Deletes.readDV(dv, ops().io(), 
ops().encryption()));
+    }
+
+    return writeDV(
+        referencedDataFile,
+        positionDeleteIndex,
+        spec,
+        firstDV.partition(),
+        firstDV.dataSequenceNumber());
+  }
+
+  private DeleteFile writeDV(
+      String referencedDataFile,
+      PositionDeleteIndex positionDeleteIndex,
+      PartitionSpec spec,
+      StructLike partition,
+      Long dataSequenceNumber) {
+    try (DVFileWriter dvFileWriter =

Review Comment:
   Yeah see my comment here 
https://github.com/apache/iceberg/pull/15006#discussion_r2680201703
   
   I was trying to avoid further complication of trying to coalesce all the DVs 
since they may have been produced with different specs, and the writer is bound 
to a spec so after merging, we have to do another grouping before writing. 
   
   It's definitley doable though, I just wasn't sure it was worth the 
complication since we expect it to be rare.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to