singhpk234 commented on code in PR #15006:
URL: https://github.com/apache/iceberg/pull/15006#discussion_r2678879058
##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1063,16 +1112,121 @@ private List<ManifestFile> newDeleteFilesAsManifests()
{
newDeleteFilesBySpec.forEach(
(specId, deleteFiles) -> {
PartitionSpec spec = ops().current().spec(specId);
+ if (foundDuplicateDVs) {
+ mergeDVsAndUpdateDeleteFiles(deleteFiles, spec);
+ }
+
+ // Update summaries for all delete files including eq. deletes for
this partition spec
+ deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec,
file));
+
List<ManifestFile> newDeleteManifests =
writeDeleteManifests(deleteFiles, spec);
cachedNewDeleteManifests.addAll(newDeleteManifests);
});
this.hasNewDeleteFiles = false;
+ this.foundDuplicateDVs = false;
}
return cachedNewDeleteManifests;
}
+ private void mergeDVsAndUpdateDeleteFiles(DeleteFileSet deleteFiles,
PartitionSpec spec) {
+ // Filter out DVs and group them by referenced data file
+ Map<String, List<DeleteFile>> dvsByReferencedLocation =
+ deleteFiles.stream()
+ .filter(ContentFileUtil::isDV)
+ .collect(
+ Collectors.toMap(
+ DeleteFile::referencedDataFile,
+ Lists::newArrayList,
+ (existingDVs, newDVs) -> {
+ existingDVs.addAll(newDVs);
+ return existingDVs;
+ }));
+
+ // Merge DVs
+ Map<String, List<DeleteFile>> dvsThatNeedMerging =
+ dvsByReferencedLocation.entrySet().stream()
+ .filter(e -> e.getValue().size() > 1)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ List<DeleteFile> newDVs = Lists.newArrayList();
+ Tasks.foreach(dvsThatNeedMerging.entrySet())
+ .executeWith(ThreadPools.getDeleteWorkerPool())
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .run(
+ dvsToMergeForDataFile ->
+ newDVs.add(
+ mergeAndWriteDV(
+ dvsToMergeForDataFile.getKey(),
dvsToMergeForDataFile.getValue(), spec)));
+
+ // Remove the merged DVs from the tracking set
+ for (List<DeleteFile> dvsThatWereMerged : dvsThatNeedMerging.values()) {
+ dvsThatWereMerged.forEach(deleteFiles::remove);
+ }
+
+ // Add the new DVs to the tracking set
+ deleteFiles.addAll(newDVs);
+ }
+
+ private DeleteFile mergeAndWriteDV(
+ String referencedDataFile, List<DeleteFile> dvs, PartitionSpec spec) {
+ DeleteFile firstDV = dvs.get(0);
+ PositionDeleteIndex positionDeleteIndex =
+ Deletes.readDV(firstDV, ops().io(), ops().encryption());
+ for (int i = 1; i < dvs.size(); i++) {
+ DeleteFile dv = dvs.get(i);
+ Preconditions.checkArgument(
+ Objects.equals(dv.dataSequenceNumber(),
firstDV.dataSequenceNumber()),
+ "Cannot merge duplicate added DVs when data sequence numbers are
different,"
+ + "expected all to be added with sequence %s, but got %s",
Review Comment:
This is during a single write ? are we worried of compaction cases or is it
just safety net ?
##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1063,16 +1112,121 @@ private List<ManifestFile> newDeleteFilesAsManifests()
{
newDeleteFilesBySpec.forEach(
(specId, deleteFiles) -> {
PartitionSpec spec = ops().current().spec(specId);
+ if (foundDuplicateDVs) {
+ mergeDVsAndUpdateDeleteFiles(deleteFiles, spec);
+ }
+
+ // Update summaries for all delete files including eq. deletes for
this partition spec
+ deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec,
file));
+
List<ManifestFile> newDeleteManifests =
writeDeleteManifests(deleteFiles, spec);
cachedNewDeleteManifests.addAll(newDeleteManifests);
});
this.hasNewDeleteFiles = false;
+ this.foundDuplicateDVs = false;
}
return cachedNewDeleteManifests;
}
+ private void mergeDVsAndUpdateDeleteFiles(DeleteFileSet deleteFiles,
PartitionSpec spec) {
+ // Filter out DVs and group them by referenced data file
+ Map<String, List<DeleteFile>> dvsByReferencedLocation =
+ deleteFiles.stream()
+ .filter(ContentFileUtil::isDV)
+ .collect(
+ Collectors.toMap(
+ DeleteFile::referencedDataFile,
+ Lists::newArrayList,
+ (existingDVs, newDVs) -> {
+ existingDVs.addAll(newDVs);
+ return existingDVs;
+ }));
+
+ // Merge DVs
+ Map<String, List<DeleteFile>> dvsThatNeedMerging =
+ dvsByReferencedLocation.entrySet().stream()
+ .filter(e -> e.getValue().size() > 1)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ List<DeleteFile> newDVs = Lists.newArrayList();
+ Tasks.foreach(dvsThatNeedMerging.entrySet())
+ .executeWith(ThreadPools.getDeleteWorkerPool())
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .run(
+ dvsToMergeForDataFile ->
+ newDVs.add(
+ mergeAndWriteDV(
+ dvsToMergeForDataFile.getKey(),
dvsToMergeForDataFile.getValue(), spec)));
+
+ // Remove the merged DVs from the tracking set
+ for (List<DeleteFile> dvsThatWereMerged : dvsThatNeedMerging.values()) {
+ dvsThatWereMerged.forEach(deleteFiles::remove);
+ }
+
+ // Add the new DVs to the tracking set
+ deleteFiles.addAll(newDVs);
+ }
+
+ private DeleteFile mergeAndWriteDV(
+ String referencedDataFile, List<DeleteFile> dvs, PartitionSpec spec) {
+ DeleteFile firstDV = dvs.get(0);
+ PositionDeleteIndex positionDeleteIndex =
+ Deletes.readDV(firstDV, ops().io(), ops().encryption());
+ for (int i = 1; i < dvs.size(); i++) {
+ DeleteFile dv = dvs.get(i);
+ Preconditions.checkArgument(
+ Objects.equals(dv.dataSequenceNumber(),
firstDV.dataSequenceNumber()),
+ "Cannot merge duplicate added DVs when data sequence numbers are
different,"
+ + "expected all to be added with sequence %s, but got %s",
+ firstDV.dataSequenceNumber(),
+ dv.dataSequenceNumber());
+
+ Preconditions.checkArgument(
+ Objects.equals(dv.partition(), firstDV.partition()),
+ "Cannot merge duplicate added DVs when partition tuples are
different");
+ positionDeleteIndex.merge(Deletes.readDV(dvs.get(i), ops().io(),
ops().encryption()));
+ }
+
+ return writeDV(
+ referencedDataFile,
+ positionDeleteIndex,
+ spec,
+ firstDV.partition(),
+ firstDV.dataSequenceNumber());
+ }
+
+ private DeleteFile writeDV(
+ String referencedDataFile,
+ PositionDeleteIndex positionDeleteIndex,
+ PartitionSpec spec,
+ StructLike partition,
+ Long dataSequenceNumber) {
+ try {
+ DVFileWriter dvFileWriter =
+ new BaseDVFileWriter(
+ OutputFileFactory.builderFor(
+ ops().locationProvider(),
+ ops().encryption(),
+ ops()::io,
+ spec,
+ FileFormat.PUFFIN,
+ 1,
+ 1)
+ .build(),
+ path -> null);
+ dvFileWriter.delete(referencedDataFile, positionDeleteIndex, spec,
partition);
+ dvFileWriter.close();
Review Comment:
i believe if something happens on the delete we would still wanna call
`dvFileWriter.close()` ? but since we need to call close before we access
result i guess best we can do is to double check if we were able to close or
not and then in finally try closing ?
##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1063,16 +1112,121 @@ private List<ManifestFile> newDeleteFilesAsManifests()
{
newDeleteFilesBySpec.forEach(
(specId, deleteFiles) -> {
PartitionSpec spec = ops().current().spec(specId);
+ if (foundDuplicateDVs) {
+ mergeDVsAndUpdateDeleteFiles(deleteFiles, spec);
+ }
+
+ // Update summaries for all delete files including eq. deletes for
this partition spec
+ deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec,
file));
+
List<ManifestFile> newDeleteManifests =
writeDeleteManifests(deleteFiles, spec);
cachedNewDeleteManifests.addAll(newDeleteManifests);
});
this.hasNewDeleteFiles = false;
+ this.foundDuplicateDVs = false;
}
return cachedNewDeleteManifests;
}
+ private void mergeDVsAndUpdateDeleteFiles(DeleteFileSet deleteFiles,
PartitionSpec spec) {
+ // Filter out DVs and group them by referenced data file
+ Map<String, List<DeleteFile>> dvsByReferencedLocation =
+ deleteFiles.stream()
+ .filter(ContentFileUtil::isDV)
+ .collect(
+ Collectors.toMap(
+ DeleteFile::referencedDataFile,
+ Lists::newArrayList,
+ (existingDVs, newDVs) -> {
+ existingDVs.addAll(newDVs);
+ return existingDVs;
+ }));
+
+ // Merge DVs
+ Map<String, List<DeleteFile>> dvsThatNeedMerging =
+ dvsByReferencedLocation.entrySet().stream()
+ .filter(e -> e.getValue().size() > 1)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ List<DeleteFile> newDVs = Lists.newArrayList();
Review Comment:
```suggestion
List<DeleteFile> newDVs =
Collections.synchronizedList(Lists.newArrayList());
```
##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1063,16 +1112,121 @@ private List<ManifestFile> newDeleteFilesAsManifests()
{
newDeleteFilesBySpec.forEach(
(specId, deleteFiles) -> {
PartitionSpec spec = ops().current().spec(specId);
+ if (foundDuplicateDVs) {
+ mergeDVsAndUpdateDeleteFiles(deleteFiles, spec);
+ }
+
+ // Update summaries for all delete files including eq. deletes for
this partition spec
+ deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec,
file));
+
List<ManifestFile> newDeleteManifests =
writeDeleteManifests(deleteFiles, spec);
cachedNewDeleteManifests.addAll(newDeleteManifests);
});
this.hasNewDeleteFiles = false;
+ this.foundDuplicateDVs = false;
}
return cachedNewDeleteManifests;
}
+ private void mergeDVsAndUpdateDeleteFiles(DeleteFileSet deleteFiles,
PartitionSpec spec) {
+ // Filter out DVs and group them by referenced data file
+ Map<String, List<DeleteFile>> dvsByReferencedLocation =
+ deleteFiles.stream()
+ .filter(ContentFileUtil::isDV)
+ .collect(
+ Collectors.toMap(
+ DeleteFile::referencedDataFile,
+ Lists::newArrayList,
+ (existingDVs, newDVs) -> {
+ existingDVs.addAll(newDVs);
+ return existingDVs;
+ }));
+
+ // Merge DVs
+ Map<String, List<DeleteFile>> dvsThatNeedMerging =
+ dvsByReferencedLocation.entrySet().stream()
+ .filter(e -> e.getValue().size() > 1)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ List<DeleteFile> newDVs = Lists.newArrayList();
+ Tasks.foreach(dvsThatNeedMerging.entrySet())
+ .executeWith(ThreadPools.getDeleteWorkerPool())
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .run(
+ dvsToMergeForDataFile ->
+ newDVs.add(
+ mergeAndWriteDV(
+ dvsToMergeForDataFile.getKey(),
dvsToMergeForDataFile.getValue(), spec)));
+
+ // Remove the merged DVs from the tracking set
+ for (List<DeleteFile> dvsThatWereMerged : dvsThatNeedMerging.values()) {
+ dvsThatWereMerged.forEach(deleteFiles::remove);
+ }
+
+ // Add the new DVs to the tracking set
+ deleteFiles.addAll(newDVs);
+ }
+
+ private DeleteFile mergeAndWriteDV(
+ String referencedDataFile, List<DeleteFile> dvs, PartitionSpec spec) {
+ DeleteFile firstDV = dvs.get(0);
+ PositionDeleteIndex positionDeleteIndex =
+ Deletes.readDV(firstDV, ops().io(), ops().encryption());
+ for (int i = 1; i < dvs.size(); i++) {
+ DeleteFile dv = dvs.get(i);
+ Preconditions.checkArgument(
+ Objects.equals(dv.dataSequenceNumber(),
firstDV.dataSequenceNumber()),
+ "Cannot merge duplicate added DVs when data sequence numbers are
different,"
+ + "expected all to be added with sequence %s, but got %s",
+ firstDV.dataSequenceNumber(),
+ dv.dataSequenceNumber());
+
+ Preconditions.checkArgument(
+ Objects.equals(dv.partition(), firstDV.partition()),
+ "Cannot merge duplicate added DVs when partition tuples are
different");
+ positionDeleteIndex.merge(Deletes.readDV(dvs.get(i), ops().io(),
ops().encryption()));
Review Comment:
I wonder now lets say DV1 and DV2 both set position in the vector is this
gonna be an illegal state because merging will be simply union them
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]