amogh-jahagirdar commented on code in PR #15006:
URL: https://github.com/apache/iceberg/pull/15006#discussion_r2695645099


##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1073,6 +1088,139 @@ private List<ManifestFile> newDeleteFilesAsManifests() {
     return cachedNewDeleteManifests;
   }
 
+  // Merge duplicates, internally takes care of updating newDeleteFilesBySpec 
to remove
+  // duplicates and add the newly merged DV
+  private void mergeDVsAndWrite() {
+    Map<Integer, List<MergedDVContent>> mergedIndicesBySpec = 
Maps.newConcurrentMap();
+
+    Tasks.foreach(dvsByReferencedFile.entrySet())
+        .executeWith(ThreadPools.getDeleteWorkerPool())
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .run(
+            entry -> {
+              String referencedLocation = entry.getKey();
+              DeleteFileSet dvsToMerge = entry.getValue();
+              // Nothing to merge
+              if (dvsToMerge.size() < 2) {
+                return;
+              }
+
+              MergedDVContent merged = mergePositions(referencedLocation, 
dvsToMerge);
+
+              mergedIndicesBySpec
+                  .computeIfAbsent(
+                      merged.specId, spec -> 
Collections.synchronizedList(Lists.newArrayList()))
+                  .add(merged);
+            });
+
+    // Update newDeleteFilesBySpec to remove all the duplicates
+    mergedIndicesBySpec.forEach(
+        (specId, mergedDVContent) -> {
+          mergedDVContent.stream()
+              .map(content -> content.mergedDVs)
+              .forEach(duplicateDVs -> 
newDeleteFilesBySpec.get(specId).removeAll(duplicateDVs));
+        });
+
+    writeMergedDVs(mergedIndicesBySpec);
+  }
+
+  // Produces a Puffin per partition spec containing the merged DVs for that 
spec

Review Comment:
   Discussed offline, since it's generally going to be 1 partition spec, we're 
OK here, and we can keep the existing behavior.cc @rdblue 
   
   I did double check, so while the OutputFileFactory requires a spec to be 
passed in the DVFileWriter isn't really bound to a spec (as expected). The 
DVFileWriter uses a newLocation() API which doesn't care about the spec or any 
tuples (again as expected).
   
   So I think the dummy partition spec idea I had would work if we want to 
further simplify all this grouping logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to