This is an automated email from the ASF dual-hosted git repository.

ashvin pushed a commit to branch 
345-read-and-translate-the-deletion-vectors-in-delta-source-table-to-xtables-internal-representation
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git

commit 5036f3b55f3bfd7a4007d8f4efcca3f0666fd3fe
Author: Ashvin Agrawal <ash...@apache.org>
AuthorDate: Tue Jan 21 21:00:09 2025 -0800

    Extract Delta Lake deletion vectors
    
    This change extracts deletion vectors represented as roaring bitmaps in 
delta lake files and
    converts them into the XTable intermediate representation.
    
    Previously, XTable only detected tables changes that included adding or 
removing of data files. Now
    the detected table change also includes any deletion vectors files added in 
the commit.
    
    Note that, in Delta Lake, the Deletion vectors are represented in a 
compressed binary format.
    However, once extracted by Xtable, the offset are currently extracted into 
a list of long offsets.
    This representation is not the most efficient for large datasets. 
Optimization is pending to
    prioritize end-to-end conversion completion.
---
 .../java/org/apache/xtable/model/TableChange.java  |   2 +-
 .../model/storage/InternalDeletionVector.java      |  75 ++++++++
 .../apache/xtable/delta/DeltaActionsConverter.java |  67 +++++++-
 .../apache/xtable/delta/DeltaConversionSource.java |  25 +--
 .../org/apache/xtable/TestSparkDeltaTable.java     |   7 +-
 .../org/apache/xtable/ValidationTestHelper.java    |   6 +-
 .../xtable/delta/ITDeltaDeleteVectorConvert.java   | 189 ++++++++++++++++++---
 .../xtable/delta/TestDeltaActionsConverter.java    | 164 +++++++++++++++---
 8 files changed, 478 insertions(+), 57 deletions(-)

diff --git a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java 
b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
index b425fd01..287b8f38 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
@@ -30,7 +30,7 @@ import org.apache.xtable.model.storage.InternalFilesDiff;
  * @since 0.1
  */
 @Value
-@Builder(toBuilder = true)
+@Builder(toBuilder = true, builderClassName = "Builder")
 public class TableChange {
   // Change in files at the specified instant
   InternalFilesDiff filesDiff;
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java
 
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java
new file mode 100644
index 00000000..868749dc
--- /dev/null
+++ 
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.model.storage;
+
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+import lombok.AccessLevel;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import lombok.experimental.FieldDefaults;
+import lombok.experimental.SuperBuilder;
+
+@Accessors(fluent = true)
+@SuperBuilder(toBuilder = true)
+@FieldDefaults(makeFinal = true, level = lombok.AccessLevel.PRIVATE)
+@Getter
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class InternalDeletionVector extends InternalDataFile {
+  // path (absolute with scheme) of data file to which this deletion vector 
belongs
+  @NonNull String dataFilePath;
+
+  // super.getFileSizeBytes() is the size of the deletion vector file
+  // super.getPhysicalPath() is the absolute path (with scheme) of the 
deletion vector file
+  // super.getRecordCount() is the count of records in the deletion vector file
+
+  // offset of deletion vector start in a deletion vector file
+  int offset;
+
+  /**
+   * binary representation of the deletion vector. The consumer can use the 
{@link
+   * #ordinalsIterator()} to extract the ordinals represented in the binary 
format.
+   */
+  byte[] binaryRepresentation;
+
+  /**
+   * Supplier for an iterator that returns the ordinals of records deleted by 
this deletion vector
+   * in the linked data file, identified by {@link #dataFilePath}.
+   *
+   * <p>The {@link InternalDeletionVector} instance does not guarantee that a 
new or distinct result
+   * will be returned each time the supplier is invoked. However, the supplier 
is expected to return
+   * a new iterator for each call.
+   */
+  @Getter(AccessLevel.NONE)
+  Supplier<Iterator<Long>> ordinalsSupplier;
+
+  /**
+   * @return An iterator that returns the ordinals of records deleted by this 
deletion vector in the
+   *     linked data file. There is no guarantee that a new or distinct 
iterator will be returned
+   *     each time the iterator is invoked.
+   */
+  public Iterator<Long> ordinalsIterator() {
+    return ordinalsSupplier.get();
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
index 16a320f1..9e3b33cf 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
@@ -18,18 +18,26 @@
  
 package org.apache.xtable.delta;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
 import org.apache.spark.sql.delta.Snapshot;
 import org.apache.spark.sql.delta.actions.AddFile;
 import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
 import org.apache.spark.sql.delta.actions.RemoveFile;
+import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArray;
+import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore;
+import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore;
+
+import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.xtable.exception.NotSupportedException;
 import org.apache.xtable.model.schema.InternalField;
@@ -38,6 +46,7 @@ import org.apache.xtable.model.stat.ColumnStat;
 import org.apache.xtable.model.stat.FileStats;
 import org.apache.xtable.model.storage.FileFormat;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalDeletionVector;
 
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class DeltaActionsConverter {
@@ -113,16 +122,66 @@ public class DeltaActionsConverter {
    *
    * @param snapshot the commit snapshot
    * @param addFile the add file action
-   * @return the deletion vector representation (path of data file), or null 
if no deletion vector
-   *     is present
+   * @return the deletion vector representation, or null if no deletion vector 
is present
    */
-  public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) {
+  public InternalDeletionVector extractDeletionVector(Snapshot snapshot, 
AddFile addFile) {
     DeletionVectorDescriptor deletionVector = addFile.deletionVector();
     if (deletionVector == null) {
       return null;
     }
 
     String dataFilePath = addFile.path();
-    return getFullPathToFile(snapshot, dataFilePath);
+    dataFilePath = getFullPathToFile(snapshot, dataFilePath);
+
+    InternalDeletionVector.InternalDeletionVectorBuilder<?, ?> 
deleteVectorBuilder =
+        InternalDeletionVector.builder()
+            .recordCount(deletionVector.cardinality())
+            .fileSizeBytes(deletionVector.sizeInBytes())
+            .dataFilePath(dataFilePath);
+
+    if (deletionVector.isInline()) {
+      deleteVectorBuilder
+          .binaryRepresentation(deletionVector.inlineData())
+          .physicalPath("")
+          .ordinalsSupplier(() -> 
ordinalsIterator(deletionVector.inlineData()));
+    } else {
+      Path deletionVectorFilePath = 
deletionVector.absolutePath(snapshot.deltaLog().dataPath());
+      deleteVectorBuilder
+          .offset(getOffset(deletionVector))
+          .physicalPath(deletionVectorFilePath.toString())
+          .ordinalsSupplier(() -> ordinalsIterator(snapshot, deletionVector));
+    }
+
+    return deleteVectorBuilder.build();
+  }
+
+  private Iterator<Long> ordinalsIterator(byte[] bytes) {
+    RoaringBitmapArray rbm = RoaringBitmapArray.readFrom(bytes);
+    long[] ordinals = rbm.values();
+    return Arrays.stream(ordinals).iterator();
+  }
+
+  private Iterator<Long> ordinalsIterator(
+      Snapshot snapshot, DeletionVectorDescriptor deleteVector) {
+    Path deletionVectorFilePath = 
deleteVector.absolutePath(snapshot.deltaLog().dataPath());
+    int offset = getOffset(deleteVector);
+    long[] ordinals =
+        parseOrdinalFile(
+            snapshot.deltaLog().newDeltaHadoopConf(),
+            deletionVectorFilePath,
+            deleteVector.sizeInBytes(),
+            offset);
+    return Arrays.stream(ordinals).iterator();
+  }
+
+  private static int getOffset(DeletionVectorDescriptor deleteVector) {
+    return deleteVector.offset().isDefined() ? (int) 
deleteVector.offset().get() : 1;
+  }
+
+  @VisibleForTesting
+  long[] parseOrdinalFile(Configuration conf, Path filePath, int size, int 
offset) {
+    DeletionVectorStore dvStore = new HadoopFileSystemDVStore(conf);
+    RoaringBitmapArray rbm = dvStore.read(filePath, offset, size);
+    return rbm.values();
   }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
index 97804d5f..554c3f64 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
@@ -22,11 +22,11 @@ import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import lombok.Builder;
 import lombok.extern.log4j.Log4j2;
@@ -53,6 +53,7 @@ import org.apache.xtable.model.TableChange;
 import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.storage.FileFormat;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalDeletionVector;
 import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 import org.apache.xtable.spi.extractor.ConversionSource;
@@ -113,8 +114,8 @@ public class DeltaConversionSource implements 
ConversionSource<Long> {
     // All 3 of the following data structures use data file's absolute path as 
the key
     Map<String, InternalDataFile> addedFiles = new HashMap<>();
     Map<String, InternalDataFile> removedFiles = new HashMap<>();
-    // Set of data file paths for which deletion vectors exists.
-    Set<String> deletionVectors = new HashSet<>();
+    // Map of data file paths for which deletion vectors exists.
+    Map<String, InternalDeletionVector> deletionVectors = new HashMap<>();
 
     for (Action action : actionsForVersion) {
       if (action instanceof AddFile) {
@@ -129,10 +130,10 @@ public class DeltaConversionSource implements 
ConversionSource<Long> {
                 DeltaPartitionExtractor.getInstance(),
                 DeltaStatsExtractor.getInstance());
         addedFiles.put(dataFile.getPhysicalPath(), dataFile);
-        String deleteVectorPath =
-            actionsConverter.extractDeletionVectorFile(snapshotAtVersion, 
(AddFile) action);
-        if (deleteVectorPath != null) {
-          deletionVectors.add(deleteVectorPath);
+        InternalDeletionVector deletionVector =
+            actionsConverter.extractDeletionVector(snapshotAtVersion, 
(AddFile) action);
+        if (deletionVector != null) {
+          deletionVectors.put(deletionVector.dataFilePath(), deletionVector);
         }
       } else if (action instanceof RemoveFile) {
         InternalDataFile dataFile =
@@ -151,7 +152,7 @@ public class DeltaConversionSource implements 
ConversionSource<Long> {
     // entry which is replaced by a new entry, AddFile with delete vector 
information. Since the
     // same data file is removed and added, we need to remove it from the 
added and removed file
     // maps which are used to track actual added and removed data files.
-    for (String deletionVector : deletionVectors) {
+    for (String deletionVector : deletionVectors.keySet()) {
       // validate that a Remove action is also added for the data file
       if (removedFiles.containsKey(deletionVector)) {
         addedFiles.remove(deletionVector);
@@ -163,11 +164,15 @@ public class DeltaConversionSource implements 
ConversionSource<Long> {
       }
     }
 
+    List<InternalDataFile> allAddedFiles =
+        Stream.concat(addedFiles.values().stream(), 
deletionVectors.values().stream())
+            .collect(Collectors.toList());
     InternalFilesDiff internalFilesDiff =
         InternalFilesDiff.builder()
-            .filesAdded(addedFiles.values())
+            .filesAdded(allAddedFiles)
             .filesRemoved(removedFiles.values())
             .build();
+
     return TableChange.builder()
         .tableAsOfChange(tableAtVersion)
         .filesDiff(internalFilesDiff)
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java 
b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
index ee5b1ccd..909b1b79 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
@@ -39,6 +39,7 @@ import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.functions;
 
 import org.apache.spark.sql.delta.DeltaLog;
+import org.apache.spark.sql.delta.actions.AddFile;
 
 import com.google.common.base.Preconditions;
 
@@ -212,11 +213,15 @@ public class TestSparkDeltaTable implements 
GenericTable<Row, Object>, Closeable
   }
 
   public List<String> getAllActiveFiles() {
-    return deltaLog.snapshot().allFiles().collectAsList().stream()
+    return getAllActiveFilesInfo().stream()
         .map(addFile -> addSlashToBasePath(basePath) + addFile.path())
         .collect(Collectors.toList());
   }
 
+  public List<AddFile> getAllActiveFilesInfo() {
+    return deltaLog.snapshot().allFiles().collectAsList();
+  }
+
   private String addSlashToBasePath(String basePath) {
     if (basePath.endsWith("/")) {
       return basePath;
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java 
b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
index 9e95f279..fc64c513 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
@@ -30,6 +30,7 @@ import java.util.stream.IntStream;
 import org.apache.xtable.model.InternalSnapshot;
 import org.apache.xtable.model.TableChange;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalDeletionVector;
 
 public class ValidationTestHelper {
 
@@ -96,7 +97,10 @@ public class ValidationTestHelper {
   }
 
   private static Set<String> extractPathsFromDataFile(Set<InternalDataFile> 
dataFiles) {
-    return 
dataFiles.stream().map(InternalDataFile::getPhysicalPath).collect(Collectors.toSet());
+    return dataFiles.stream()
+        .filter(file -> !(file instanceof InternalDeletionVector))
+        .map(InternalDataFile::getPhysicalPath)
+        .collect(Collectors.toSet());
   }
 
   private static void replaceFileScheme(List<String> filePaths) {
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
index ed02893e..dcfb5f80 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
@@ -19,11 +19,16 @@
 package org.apache.xtable.delta;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
@@ -38,6 +43,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.spark.sql.delta.DeltaLog;
 import org.apache.spark.sql.delta.actions.AddFile;
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
 
 import scala.Option;
 
@@ -49,6 +55,7 @@ import org.apache.xtable.model.CommitsBacklog;
 import org.apache.xtable.model.InstantsForIncrementalSync;
 import org.apache.xtable.model.InternalSnapshot;
 import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.storage.InternalDeletionVector;
 import org.apache.xtable.model.storage.TableFormat;
 
 public class ITDeltaDeleteVectorConvert {
@@ -56,6 +63,7 @@ public class ITDeltaDeleteVectorConvert {
   private static SparkSession sparkSession;
 
   private DeltaConversionSourceProvider conversionSourceProvider;
+  private TestSparkDeltaTable testSparkDeltaTable;
 
   @BeforeAll
   public static void setupOnce() {
@@ -91,11 +99,24 @@ public class ITDeltaDeleteVectorConvert {
     conversionSourceProvider.init(hadoopConf);
   }
 
+  private static class TableState {
+    Map<String, AddFile> activeFiles;
+    List<Row> rowsToDelete;
+
+    TableState(Map<String, AddFile> activeFiles) {
+      this(activeFiles, Collections.emptyList());
+    }
+
+    TableState(Map<String, AddFile> activeFiles, List<Row> rowsToDelete) {
+      this.activeFiles = activeFiles;
+      this.rowsToDelete = rowsToDelete;
+    }
+  }
+
   @Test
   public void testInsertsUpsertsAndDeletes() {
     String tableName = GenericTable.getTableName();
-    TestSparkDeltaTable testSparkDeltaTable =
-        new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false);
+    testSparkDeltaTable = new TestSparkDeltaTable(tableName, tempDir, 
sparkSession, null, false);
 
     // enable deletion vectors for the test table
     testSparkDeltaTable
@@ -105,25 +126,30 @@ public class ITDeltaDeleteVectorConvert {
                 + tableName
                 + " SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)");
 
-    List<List<String>> allActiveFiles = new ArrayList<>();
+    List<TableState> testTableStates = new ArrayList<>();
     List<TableChange> allTableChanges = new ArrayList<>();
     List<Row> rows = testSparkDeltaTable.insertRows(50);
     Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp();
-    allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+    Map<String, AddFile> tableFiles = 
collectActiveFilesAfterCommit(testSparkDeltaTable);
+    testTableStates.add(new TableState(tableFiles, Collections.emptyList()));
 
     List<Row> rows1 = testSparkDeltaTable.insertRows(50);
-    allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+    tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable);
+    testTableStates.add(new TableState(tableFiles));
+    validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0);
     assertEquals(100L, testSparkDeltaTable.getNumRows());
-    validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 
allActiveFiles.size() + 1, 0, 0);
 
     // upsert does not create delete vectors
     testSparkDeltaTable.upsertRows(rows.subList(0, 20));
-    allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+    tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable);
+    testTableStates.add(new TableState(tableFiles));
+    validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0);
     assertEquals(100L, testSparkDeltaTable.getNumRows());
-    validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 
allActiveFiles.size() + 1, 0, 0);
 
     testSparkDeltaTable.insertRows(50);
-    allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+    tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable);
+    testTableStates.add(new TableState(tableFiles));
+    validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0);
     assertEquals(150L, testSparkDeltaTable.getNumRows());
 
     // delete a few rows with gaps in ids
@@ -133,12 +159,15 @@ public class ITDeltaDeleteVectorConvert {
             .collect(Collectors.toList());
     rowsToDelete.addAll(rows.subList(35, 45));
     testSparkDeltaTable.deleteRows(rowsToDelete);
-    allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+    tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable);
+    testTableStates.add(new TableState(tableFiles, rowsToDelete));
+    validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 15);
     assertEquals(135L, testSparkDeltaTable.getNumRows());
-    validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 
allActiveFiles.size() + 1, 2, 15);
 
     testSparkDeltaTable.insertRows(50);
-    allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+    tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable);
+    testTableStates.add(new TableState(tableFiles));
+    validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 15);
     assertEquals(185L, testSparkDeltaTable.getNumRows());
 
     // delete a few rows from a file which already has a deletion vector, this 
should generate a
@@ -146,18 +175,22 @@ public class ITDeltaDeleteVectorConvert {
     // This deletion step intentionally deletes the same rows again to test 
the merge.
     rowsToDelete = rows1.subList(5, 15);
     testSparkDeltaTable.deleteRows(rowsToDelete);
-    allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+    tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable);
+    testTableStates.add(new TableState(tableFiles, rowsToDelete));
+    validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 22);
     assertEquals(178L, testSparkDeltaTable.getNumRows());
-    validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 
allActiveFiles.size() + 1, 2, 22);
 
     testSparkDeltaTable.insertRows(50);
-    allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+    tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable);
+    testTableStates.add(new TableState(tableFiles));
+    validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 22);
     assertEquals(228L, testSparkDeltaTable.getNumRows());
 
+    String tableBasePath = testSparkDeltaTable.getBasePath();
     SourceTable tableConfig =
         SourceTable.builder()
             .name(testSparkDeltaTable.getTableName())
-            .basePath(testSparkDeltaTable.getBasePath())
+            .basePath(tableBasePath)
             .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
@@ -165,8 +198,9 @@ public class ITDeltaDeleteVectorConvert {
     InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
 
     //    validateDeltaPartitioning(internalSnapshot);
-    ValidationTestHelper.validateSnapshot(
-        internalSnapshot, allActiveFiles.get(allActiveFiles.size() - 1));
+    List<String> activeDataFilePaths =
+        new ArrayList<>(testTableStates.get(testTableStates.size() - 
1).activeFiles.keySet());
+    ValidationTestHelper.validateSnapshot(internalSnapshot, 
activeDataFilePaths);
 
     // Get changes in incremental format.
     InstantsForIncrementalSync instantsForIncrementalSync =
@@ -179,13 +213,126 @@ public class ITDeltaDeleteVectorConvert {
       TableChange tableChange = 
conversionSource.getTableChangeForCommit(version);
       allTableChanges.add(tableChange);
     }
-    ValidationTestHelper.validateTableChanges(allActiveFiles, allTableChanges);
+
+    List<List<String>> allActiveDataFilePaths =
+        testTableStates.stream()
+            .map(s -> s.activeFiles)
+            .map(Map::keySet)
+            .map(ArrayList::new)
+            .collect(Collectors.toList());
+    ValidationTestHelper.validateTableChanges(allActiveDataFilePaths, 
allTableChanges);
+
+    validateDeletionInfo(testTableStates, allTableChanges);
+  }
+
+  // collects active files in the current snapshot as a map and adds it to the 
list
+  private Map<String, AddFile> collectActiveFilesAfterCommit(
+      TestSparkDeltaTable testSparkDeltaTable) {
+    Map<String, AddFile> allFiles =
+        testSparkDeltaTable.getAllActiveFilesInfo().stream()
+            .collect(
+                Collectors.toMap(
+                    file -> getAddFileAbsolutePath(file, 
testSparkDeltaTable.getBasePath()),
+                    file -> file));
+    return allFiles;
+  }
+
+  private void validateDeletionInfo(
+      List<TableState> testTableStates, List<TableChange> allTableChanges) {
+    if (allTableChanges.isEmpty() && testTableStates.size() <= 1) {
+      return;
+    }
+
+    assertEquals(
+        allTableChanges.size(),
+        testTableStates.size() - 1,
+        "Number of table changes should be equal to number of commits - 1");
+
+    for (int i = 0; i < allTableChanges.size() - 1; i++) {
+      Map<String, AddFile> activeFileAfterCommit = testTableStates.get(i + 
1).activeFiles;
+      Map<String, AddFile> activeFileBeforeCommit = 
testTableStates.get(i).activeFiles;
+
+      Map<String, AddFile> activeFilesWithUpdatedDeleteInfo =
+          activeFileAfterCommit.entrySet().stream()
+              .filter(e -> e.getValue().deletionVector() != null)
+              .filter(
+                  entry -> {
+                    if (activeFileBeforeCommit.get(entry.getKey()) == null) {
+                      return true;
+                    }
+                    if 
(activeFileBeforeCommit.get(entry.getKey()).deletionVector() == null) {
+                      return true;
+                    }
+                    DeletionVectorDescriptor deletionVectorDescriptor =
+                        
activeFileBeforeCommit.get(entry.getKey()).deletionVector();
+                    return 
!deletionVectorDescriptor.equals(entry.getValue().deletionVector());
+                  })
+              .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+      if (activeFilesWithUpdatedDeleteInfo.isEmpty()) {
+        continue;
+      }
+
+      // validate all new delete vectors are correctly detected
+      validateDeletionInfoForCommit(
+          testTableStates.get(i + 1), activeFilesWithUpdatedDeleteInfo, 
allTableChanges.get(i));
+    }
+  }
+
+  private void validateDeletionInfoForCommit(
+      TableState tableState,
+      Map<String, AddFile> activeFilesAfterCommit,
+      TableChange changeDetectedForCommit) {
+    Map<String, InternalDeletionVector> detectedDeleteInfos =
+        changeDetectedForCommit.getFilesDiff().getFilesAdded().stream()
+            .filter(file -> file instanceof InternalDeletionVector)
+            .map(file -> (InternalDeletionVector) file)
+            .collect(Collectors.toMap(InternalDeletionVector::dataFilePath, 
file -> file));
+
+    Map<String, AddFile> filesWithDeleteVectors =
+        activeFilesAfterCommit.entrySet().stream()
+            .filter(file -> file.getValue().deletionVector() != null)
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+    assertEquals(filesWithDeleteVectors.size(), detectedDeleteInfos.size());
+
+    for (Map.Entry<String, AddFile> fileWithDeleteVector : 
filesWithDeleteVectors.entrySet()) {
+      InternalDeletionVector deleteInfo = 
detectedDeleteInfos.get(fileWithDeleteVector.getKey());
+      assertNotNull(deleteInfo);
+      DeletionVectorDescriptor deletionVectorDescriptor =
+          fileWithDeleteVector.getValue().deletionVector();
+      assertEquals(deletionVectorDescriptor.cardinality(), 
deleteInfo.getRecordCount());
+      assertEquals(deletionVectorDescriptor.sizeInBytes(), 
deleteInfo.getFileSizeBytes());
+      assertEquals(deletionVectorDescriptor.offset().get(), 
deleteInfo.offset());
+
+      String deletionFilePath =
+          deletionVectorDescriptor
+              .absolutePath(new 
org.apache.hadoop.fs.Path(testSparkDeltaTable.getBasePath()))
+              .toString();
+      assertEquals(deletionFilePath, deleteInfo.getPhysicalPath());
+
+      Iterator<Long> iterator = deleteInfo.ordinalsIterator();
+      List<Long> deletes = new ArrayList<>();
+      iterator.forEachRemaining(deletes::add);
+      assertEquals(deletes.size(), deleteInfo.getRecordCount());
+    }
+  }
+
+  private static String getAddFileAbsolutePath(AddFile file, String 
tableBasePath) {
+    String filePath = file.path();
+    if (filePath.startsWith(tableBasePath)) {
+      return filePath;
+    }
+    return Paths.get(tableBasePath, file.path()).toString();
   }
 
   private void validateDeletedRecordCount(
-      DeltaLog deltaLog, int version, int deleteVectorFileCount, int 
deletionRecordCount) {
+      DeltaLog deltaLog, int deleteVectorFileCount, int deletionRecordCount) {
     List<AddFile> allFiles =
-        deltaLog.getSnapshotAt(version, 
Option.empty()).allFiles().collectAsList();
+        deltaLog
+            .getSnapshotAt(deltaLog.snapshot().version(), Option.empty())
+            .allFiles()
+            .collectAsList();
     List<AddFile> filesWithDeletionVectors =
         allFiles.stream().filter(f -> f.deletionVector() != 
null).collect(Collectors.toList());
 
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
index e62e9341..117b3fc7 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
@@ -18,10 +18,20 @@
  
 package org.apache.xtable.delta;
 
-import java.net.URISyntaxException;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
@@ -29,39 +39,155 @@ import org.apache.spark.sql.delta.DeltaLog;
 import org.apache.spark.sql.delta.Snapshot;
 import org.apache.spark.sql.delta.actions.AddFile;
 import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
+import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArray;
+import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArrayFormat;
 
 import scala.Option;
 
+import org.apache.xtable.model.storage.InternalDeletionVector;
+
 class TestDeltaActionsConverter {
 
+  private final String basePath = 
"https://container.blob.core.windows.net/tablepath/";;
+  private final int size = 372;
+  private final long time = 376;
+  private final boolean dataChange = true;
+  private final String stats = "";
+  private final int cardinality = 42;
+  private final int offset = 634;
+
   @Test
-  void extractDeletionVector() throws URISyntaxException {
+  void extractMissingDeletionVector() {
     DeltaActionsConverter actionsConverter = 
DeltaActionsConverter.getInstance();
 
-    int size = 123;
-    long time = 234L;
-    boolean dataChange = true;
-    String stats = "";
-    String filePath = 
"https://container.blob.core.windows.net/tablepath/file_path";;
+    String filePath = basePath + "file_path";
     Snapshot snapshot = Mockito.mock(Snapshot.class);
-    DeltaLog deltaLog = Mockito.mock(DeltaLog.class);
 
     DeletionVectorDescriptor deletionVector = null;
     AddFile addFileAction =
         new AddFile(filePath, null, size, time, dataChange, stats, null, 
deletionVector);
-    Assertions.assertNull(actionsConverter.extractDeletionVectorFile(snapshot, 
addFileAction));
+    InternalDeletionVector internaldeletionVector =
+        actionsConverter.extractDeletionVector(snapshot, addFileAction);
+    assertNull(internaldeletionVector);
+  }
 
-    deletionVector =
+  @Test
+  void extractDeletionVectorInFileAbsolutePath() {
+    DeltaActionsConverter actionsConverter = 
spy(DeltaActionsConverter.getInstance());
+
+    String dataFilePath = "data_file";
+    String deleteFilePath = 
"https://container.blob.core.windows.net/tablepath/delete_file";;
+    Snapshot snapshot = Mockito.mock(Snapshot.class);
+
+    DeletionVectorDescriptor deletionVector =
         DeletionVectorDescriptor.onDiskWithAbsolutePath(
-            filePath, size, 42, Option.empty(), Option.empty());
+            deleteFilePath, size, cardinality, Option.apply(offset), 
Option.empty());
 
-    addFileAction =
-        new AddFile(filePath, null, size, time, dataChange, stats, null, 
deletionVector);
+    AddFile addFileAction =
+        new AddFile(dataFilePath, null, size, time, dataChange, stats, null, 
deletionVector);
+
+    Configuration conf = new Configuration();
+    DeltaLog deltaLog = Mockito.mock(DeltaLog.class);
+    when(snapshot.deltaLog()).thenReturn(deltaLog);
+    when(deltaLog.dataPath()).thenReturn(new Path(basePath));
+    when(deltaLog.newDeltaHadoopConf()).thenReturn(conf);
+
+    long[] ordinals = {45, 78, 98};
+    Mockito.doReturn(ordinals)
+        .when(actionsConverter)
+        .parseOrdinalFile(conf, new Path(deleteFilePath), size, offset);
+
+    InternalDeletionVector internaldeletionVector =
+        actionsConverter.extractDeletionVector(snapshot, addFileAction);
+    assertNotNull(internaldeletionVector);
+    assertEquals(basePath + dataFilePath, 
internaldeletionVector.dataFilePath());
+    assertEquals(deleteFilePath, internaldeletionVector.getPhysicalPath());
+    assertEquals(offset, internaldeletionVector.offset());
+    assertEquals(cardinality, internaldeletionVector.getRecordCount());
+    assertEquals(size, internaldeletionVector.getFileSizeBytes());
+    assertNull(internaldeletionVector.binaryRepresentation());
+
+    Iterator<Long> iterator = internaldeletionVector.ordinalsIterator();
+    Arrays.stream(ordinals).forEach(o -> assertEquals(o, iterator.next()));
+    assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  void extractDeletionVectorInFileRelativePath() {
+    DeltaActionsConverter actionsConverter = 
spy(DeltaActionsConverter.getInstance());
+
+    String dataFilePath = "data_file";
+    UUID deleteFileId = UUID.randomUUID();
+    String deleteFilePath = basePath + "deletion_vector_" + deleteFileId + 
".bin";
+    Snapshot snapshot = Mockito.mock(Snapshot.class);
+
+    DeletionVectorDescriptor deletionVector =
+        DeletionVectorDescriptor.onDiskWithRelativePath(
+            deleteFileId, "", size, cardinality, Option.apply(offset), 
Option.empty());
+
+    AddFile addFileAction =
+        new AddFile(dataFilePath, null, size, time, dataChange, stats, null, 
deletionVector);
+
+    Configuration conf = new Configuration();
+    DeltaLog deltaLog = Mockito.mock(DeltaLog.class);
+    when(snapshot.deltaLog()).thenReturn(deltaLog);
+    when(deltaLog.dataPath()).thenReturn(new Path(basePath));
+    when(deltaLog.newDeltaHadoopConf()).thenReturn(conf);
+
+    long[] ordinals = {45, 78, 98};
+    Mockito.doReturn(ordinals)
+        .when(actionsConverter)
+        .parseOrdinalFile(conf, new Path(deleteFilePath), size, offset);
+
+    InternalDeletionVector internaldeletionVector =
+        actionsConverter.extractDeletionVector(snapshot, addFileAction);
+    assertNotNull(internaldeletionVector);
+    assertEquals(basePath + dataFilePath, 
internaldeletionVector.dataFilePath());
+    assertEquals(deleteFilePath, internaldeletionVector.getPhysicalPath());
+    assertEquals(offset, internaldeletionVector.offset());
+    assertEquals(cardinality, internaldeletionVector.getRecordCount());
+    assertEquals(size, internaldeletionVector.getFileSizeBytes());
+    assertNull(internaldeletionVector.binaryRepresentation());
+
+    Iterator<Long> iterator = internaldeletionVector.ordinalsIterator();
+    Arrays.stream(ordinals).forEach(o -> assertEquals(o, iterator.next()));
+    assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  void extractInlineDeletionVector() {
+    DeltaActionsConverter actionsConverter = 
spy(DeltaActionsConverter.getInstance());
+
+    String dataFilePath = "data_file";
+    Snapshot snapshot = Mockito.mock(Snapshot.class);
+
+    long[] ordinals = {45, 78, 98};
+    RoaringBitmapArray rbm = new RoaringBitmapArray();
+    Arrays.stream(ordinals).forEach(rbm::add);
+    byte[] bytes = 
rbm.serializeAsByteArray(RoaringBitmapArrayFormat.Portable());
+
+    DeletionVectorDescriptor deletionVector =
+        DeletionVectorDescriptor.inlineInLog(bytes, cardinality);
+
+    AddFile addFileAction =
+        new AddFile(dataFilePath, null, size, time, dataChange, stats, null, 
deletionVector);
+
+    DeltaLog deltaLog = Mockito.mock(DeltaLog.class);
+    when(snapshot.deltaLog()).thenReturn(deltaLog);
+    when(deltaLog.dataPath()).thenReturn(new Path(basePath));
+
+    InternalDeletionVector internaldeletionVector =
+        actionsConverter.extractDeletionVector(snapshot, addFileAction);
+    assertNotNull(internaldeletionVector);
+    assertEquals(basePath + dataFilePath, 
internaldeletionVector.dataFilePath());
+    assertArrayEquals(bytes, internaldeletionVector.binaryRepresentation());
+    assertEquals(cardinality, internaldeletionVector.getRecordCount());
+    assertEquals(bytes.length, internaldeletionVector.getFileSizeBytes());
+    assertEquals("", internaldeletionVector.getPhysicalPath());
+    assertEquals(0, internaldeletionVector.offset());
 
-    Mockito.when(snapshot.deltaLog()).thenReturn(deltaLog);
-    Mockito.when(deltaLog.dataPath())
-        .thenReturn(new 
Path("https://container.blob.core.windows.net/tablepath";));
-    Assertions.assertEquals(
-        filePath, actionsConverter.extractDeletionVectorFile(snapshot, 
addFileAction));
+    Iterator<Long> iterator = internaldeletionVector.ordinalsIterator();
+    Arrays.stream(ordinals).forEach(o -> assertEquals(o, iterator.next()));
+    assertFalse(iterator.hasNext());
   }
 }


Reply via email to