This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 5bd314bdf6 Core: Support DVs in DeleteFileIndex (#11467)
5bd314bdf6 is described below

commit 5bd314bdf6c3b5e0e5346d0f7408353bdf31bc81
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Nov 5 15:52:24 2024 +0100

    Core: Support DVs in DeleteFileIndex (#11467)
---
 .../java/org/apache/iceberg/DeleteFileIndex.java   | 62 +++++++++++++++++++---
 .../org/apache/iceberg/util/ContentFileUtil.java   | 14 +++++
 .../apache/iceberg/DeleteFileIndexTestBase.java    | 52 ++++++++++++++++++
 3 files changed, 121 insertions(+), 7 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java 
b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
index 8444b91eec..ab7fec6fb1 100644
--- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -33,6 +33,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ManifestEvaluator;
@@ -70,6 +71,7 @@ class DeleteFileIndex {
   private final PartitionMap<EqualityDeletes> eqDeletesByPartition;
   private final PartitionMap<PositionDeletes> posDeletesByPartition;
   private final Map<String, PositionDeletes> posDeletesByPath;
+  private final Map<String, DeleteFile> dvByPath;
   private final boolean hasEqDeletes;
   private final boolean hasPosDeletes;
   private final boolean isEmpty;
@@ -78,13 +80,16 @@ class DeleteFileIndex {
       EqualityDeletes globalDeletes,
       PartitionMap<EqualityDeletes> eqDeletesByPartition,
       PartitionMap<PositionDeletes> posDeletesByPartition,
-      Map<String, PositionDeletes> posDeletesByPath) {
+      Map<String, PositionDeletes> posDeletesByPath,
+      Map<String, DeleteFile> dvByPath) {
     this.globalDeletes = globalDeletes;
     this.eqDeletesByPartition = eqDeletesByPartition;
     this.posDeletesByPartition = posDeletesByPartition;
     this.posDeletesByPath = posDeletesByPath;
+    this.dvByPath = dvByPath;
     this.hasEqDeletes = globalDeletes != null || eqDeletesByPartition != null;
-    this.hasPosDeletes = posDeletesByPartition != null || posDeletesByPath != 
null;
+    this.hasPosDeletes =
+        posDeletesByPartition != null || posDeletesByPath != null || dvByPath 
!= null;
     this.isEmpty = !hasEqDeletes && !hasPosDeletes;
   }
 
@@ -125,6 +130,10 @@ class DeleteFileIndex {
       }
     }
 
+    if (dvByPath != null) {
+      deleteFiles = Iterables.concat(deleteFiles, dvByPath.values());
+    }
+
     return deleteFiles;
   }
 
@@ -143,9 +152,16 @@ class DeleteFileIndex {
 
     DeleteFile[] global = findGlobalDeletes(sequenceNumber, file);
     DeleteFile[] eqPartition = findEqPartitionDeletes(sequenceNumber, file);
-    DeleteFile[] posPartition = findPosPartitionDeletes(sequenceNumber, file);
-    DeleteFile[] posPath = findPathDeletes(sequenceNumber, file);
-    return concat(global, eqPartition, posPartition, posPath);
+    DeleteFile dv = findDV(sequenceNumber, file);
+    if (dv != null && global == null && eqPartition == null) {
+      return new DeleteFile[] {dv};
+    } else if (dv != null) {
+      return concat(global, eqPartition, new DeleteFile[] {dv});
+    } else {
+      DeleteFile[] posPartition = findPosPartitionDeletes(sequenceNumber, 
file);
+      DeleteFile[] posPath = findPathDeletes(sequenceNumber, file);
+      return concat(global, eqPartition, posPartition, posPath);
+    }
   }
 
   private DeleteFile[] findGlobalDeletes(long seq, DataFile dataFile) {
@@ -180,6 +196,22 @@ class DeleteFileIndex {
     return deletes == null ? EMPTY_DELETES : deletes.filter(seq);
   }
 
+  private DeleteFile findDV(long seq, DataFile dataFile) {
+    if (dvByPath == null) {
+      return null;
+    }
+
+    DeleteFile dv = dvByPath.get(dataFile.location());
+    if (dv != null) {
+      ValidationException.check(
+          dv.dataSequenceNumber() >= seq,
+          "DV data sequence number (%s) must be greater than or equal to data 
file sequence number (%s)",
+          dv.dataSequenceNumber(),
+          seq);
+    }
+    return dv;
+  }
+
   @SuppressWarnings("checkstyle:CyclomaticComplexity")
   private static boolean canContainEqDeletesForFile(
       DataFile dataFile, EqualityDeleteFile deleteFile) {
@@ -434,11 +466,16 @@ class DeleteFileIndex {
       PartitionMap<EqualityDeletes> eqDeletesByPartition = 
PartitionMap.create(specsById);
       PartitionMap<PositionDeletes> posDeletesByPartition = 
PartitionMap.create(specsById);
       Map<String, PositionDeletes> posDeletesByPath = Maps.newHashMap();
+      Map<String, DeleteFile> dvByPath = Maps.newHashMap();
 
       for (DeleteFile file : files) {
         switch (file.content()) {
           case POSITION_DELETES:
-            add(posDeletesByPath, posDeletesByPartition, file);
+            if (ContentFileUtil.isDV(file)) {
+              add(dvByPath, file);
+            } else {
+              add(posDeletesByPath, posDeletesByPartition, file);
+            }
             break;
           case EQUALITY_DELETES:
             add(globalDeletes, eqDeletesByPartition, file);
@@ -453,7 +490,18 @@ class DeleteFileIndex {
           globalDeletes.isEmpty() ? null : globalDeletes,
           eqDeletesByPartition.isEmpty() ? null : eqDeletesByPartition,
           posDeletesByPartition.isEmpty() ? null : posDeletesByPartition,
-          posDeletesByPath.isEmpty() ? null : posDeletesByPath);
+          posDeletesByPath.isEmpty() ? null : posDeletesByPath,
+          dvByPath.isEmpty() ? null : dvByPath);
+    }
+
+    private void add(Map<String, DeleteFile> dvByPath, DeleteFile dv) {
+      String path = dv.referencedDataFile();
+      DeleteFile existingDV = dvByPath.putIfAbsent(path, dv);
+      if (existingDV != null) {
+        throw new ValidationException(
+            "Can't index multiple DVs for %s: %s and %s",
+            path, ContentFileUtil.dvDesc(dv), 
ContentFileUtil.dvDesc(existingDV));
+      }
     }
 
     private void add(
diff --git a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java 
b/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
index c82b3ff828..e4666bd1bd 100644
--- a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Type;
@@ -84,4 +85,17 @@ public class ContentFileUtil {
     CharSequence location = referencedDataFile(deleteFile);
     return location != null ? location.toString() : null;
   }
+
+  public static boolean isDV(DeleteFile deleteFile) {
+    return deleteFile.format() == FileFormat.PUFFIN;
+  }
+
+  public static String dvDesc(DeleteFile deleteFile) {
+    return String.format(
+        "DV{location=%s, offset=%s, length=%s, referencedDataFile=%s}",
+        deleteFile.location(),
+        deleteFile.contentOffset(),
+        deleteFile.contentSizeInBytes(),
+        deleteFile.referencedDataFile());
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java 
b/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
index de7e59ac17..6ef28191e7 100644
--- a/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
@@ -22,20 +22,24 @@ import static 
org.apache.iceberg.expressions.Expressions.bucket;
 import static org.apache.iceberg.expressions.Expressions.equal;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import org.apache.iceberg.DeleteFileIndex.EqualityDeletes;
 import org.apache.iceberg.DeleteFileIndex.PositionDeletes;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.ContentFileUtil;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -621,4 +625,52 @@ public abstract class DeleteFileIndexTestBase<
     // it should not be possible to add more elements upon indexing
     assertThatThrownBy(() -> group.add(SPEC, 
file1)).isInstanceOf(IllegalStateException.class);
   }
+
+  @TestTemplate
+  public void testMixDeleteFilesAndDVs() {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    List<DeleteFile> deletes =
+        Arrays.asList(
+            withDataSequenceNumber(1, partitionedPosDeletes(SPEC, 
FILE_A.partition())),
+            withDataSequenceNumber(2, newDV(FILE_A)),
+            withDataSequenceNumber(1, partitionedPosDeletes(SPEC, 
FILE_B.partition())),
+            withDataSequenceNumber(2, partitionedPosDeletes(SPEC, 
FILE_B.partition())));
+
+    DeleteFileIndex index = 
DeleteFileIndex.builderFor(deletes).specsById(table.specs()).build();
+
+    DeleteFile[] fileADeletes = index.forDataFile(0, FILE_A);
+    assertThat(fileADeletes).as("Only DV should apply to FILE_A").hasSize(1);
+    assertThat(ContentFileUtil.isDV(fileADeletes[0])).isTrue();
+    
assertThat(fileADeletes[0].referencedDataFile()).isEqualTo(FILE_A.location());
+
+    DeleteFile[] fileBDeletes = index.forDataFile(0, FILE_B);
+    assertThat(fileBDeletes).as("Two delete files should apply to 
FILE_B").hasSize(2);
+    assertThat(ContentFileUtil.isDV(fileBDeletes[0])).isFalse();
+    assertThat(ContentFileUtil.isDV(fileBDeletes[1])).isFalse();
+  }
+
+  @TestTemplate
+  public void testMultipleDVs() {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    DeleteFile dv1 = withDataSequenceNumber(1, newDV(FILE_A));
+    DeleteFile dv2 = withDataSequenceNumber(2, newDV(FILE_A));
+    List<DeleteFile> dvs = Arrays.asList(dv1, dv2);
+
+    assertThatThrownBy(() -> 
DeleteFileIndex.builderFor(dvs).specsById(table.specs()).build())
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Can't index multiple DVs for %s", 
FILE_A.location());
+  }
+
+  @TestTemplate
+  public void testInvalidDVSequenceNumber() {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+    DeleteFile dv = withDataSequenceNumber(1, newDV(FILE_A));
+    List<DeleteFile> dvs = Collections.singletonList(dv);
+    DeleteFileIndex index = 
DeleteFileIndex.builderFor(dvs).specsById(table.specs()).build();
+    assertThatThrownBy(() -> index.forDataFile(2, FILE_A))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("must be greater than or equal to data file 
sequence number");
+  }
 }

Reply via email to