amogh-jahagirdar commented on code in PR #15006:
URL: https://github.com/apache/iceberg/pull/15006#discussion_r2678915957


##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1063,16 +1112,121 @@ private List<ManifestFile> newDeleteFilesAsManifests() 
{
       newDeleteFilesBySpec.forEach(
           (specId, deleteFiles) -> {
             PartitionSpec spec = ops().current().spec(specId);
+            if (foundDuplicateDVs) {
+              mergeDVsAndUpdateDeleteFiles(deleteFiles, spec);
+            }
+
+            // Update summaries for all delete files including eq. deletes for 
this partition spec
+            deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec, 
file));
+
             List<ManifestFile> newDeleteManifests = 
writeDeleteManifests(deleteFiles, spec);
             cachedNewDeleteManifests.addAll(newDeleteManifests);
           });
 
       this.hasNewDeleteFiles = false;
+      this.foundDuplicateDVs = false;
     }
 
     return cachedNewDeleteManifests;
   }
 
+  private void mergeDVsAndUpdateDeleteFiles(DeleteFileSet deleteFiles, 
PartitionSpec spec) {
+    // Filter out DVs and group them by referenced data file
+    Map<String, List<DeleteFile>> dvsByReferencedLocation =
+        deleteFiles.stream()
+            .filter(ContentFileUtil::isDV)
+            .collect(
+                Collectors.toMap(
+                    DeleteFile::referencedDataFile,
+                    Lists::newArrayList,
+                    (existingDVs, newDVs) -> {
+                      existingDVs.addAll(newDVs);
+                      return existingDVs;
+                    }));
+
+    // Merge DVs
+    Map<String, List<DeleteFile>> dvsThatNeedMerging =
+        dvsByReferencedLocation.entrySet().stream()
+            .filter(e -> e.getValue().size() > 1)
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+    List<DeleteFile> newDVs = Lists.newArrayList();

Review Comment:
   Yeah I was changing the algorithm to be more like what I mentioned above and 
realized that it would throw a concurrent modified exception, since we're now 
parallelizing across impacted referenced files, it's now a concurrent map. let 
me know what you think. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to