This is an automated email from the ASF dual-hosted git repository.

ashvin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new f6767883 Fix commit log parsing of Delta tables with delete vector
f6767883 is described below

commit f6767883f8c27c4ad39e4bb54352fcdae964c11d
Author: Ashvin Agrawal <ash...@apache.org>
AuthorDate: Sun Dec 8 21:00:09 2024 -0800

    Fix commit log parsing of Delta tables with delete vector
    
    - Correct handling of delete vectors to avoid adding data file paths to 
both new and removed file
      sets in `FileDiff`
    - Detect new file stats in logs that are not supported by XTable and log 
them. New stats do not
      break parsing of the stats json. For e.g. `tightBounds` property in Delta 
stats if deletion
      vectors are enabled
---
 .../apache/xtable/delta/DeltaActionsConverter.java | 21 +++++++
 .../apache/xtable/delta/DeltaConversionSource.java | 49 +++++++++++++---
 .../apache/xtable/delta/DeltaStatsExtractor.java   | 49 +++++++++++++++-
 .../xtable/delta/ITDeltaDeleteVectorConvert.java   | 45 +++++++++++----
 .../xtable/delta/TestDeltaActionsConverter.java    | 67 ++++++++++++++++++++++
 .../xtable/delta/TestDeltaStatsExtractor.java      |  7 +++
 6 files changed, 218 insertions(+), 20 deletions(-)

diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
index fbee89f4..40b822df 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
 
 import org.apache.spark.sql.delta.Snapshot;
 import org.apache.spark.sql.delta.actions.AddFile;
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
 import org.apache.spark.sql.delta.actions.RemoveFile;
 
 import org.apache.xtable.exception.NotSupportedException;
@@ -106,4 +107,24 @@ public class DeltaActionsConverter {
     }
     return tableBasePath + Path.SEPARATOR + dataFilePath;
   }
+
+  /**
+   * Extracts the representation of the deletion vector information 
corresponding to an AddFile
+   * action. Currently, this method extracts and returns the path to the data 
file for which a
+   * deletion vector data is present.
+   *
+   * @param snapshot the commit snapshot
+   * @param addFile the add file action
+   * @return the deletion vector representation (path of data file), or null 
if no deletion vector
+   *     is present
+   */
+  public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) {
+    DeletionVectorDescriptor deletionVector = addFile.deletionVector();
+    if (deletionVector == null) {
+      return null;
+    }
+
+    String dataFilePath = addFile.path();
+    return getFullPathToFile(snapshot, dataFilePath);
+  }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
index 19ecc02c..a5937b02 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
@@ -21,8 +21,10 @@ package org.apache.xtable.delta;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
@@ -99,11 +101,16 @@ public class DeltaConversionSource implements 
ConversionSource<Long> {
     Snapshot snapshotAtVersion = deltaLog.getSnapshotAt(versionNumber, 
Option.empty());
     FileFormat fileFormat =
         
actionsConverter.convertToFileFormat(snapshotAtVersion.metadata().format().provider());
-    Set<InternalDataFile> addedFiles = new HashSet<>();
-    Set<InternalDataFile> removedFiles = new HashSet<>();
+
+    // All 3 of the following data structures use data file's absolute path as 
the key
+    Map<String, InternalDataFile> addedFiles = new HashMap<>();
+    Map<String, InternalDataFile> removedFiles = new HashMap<>();
+    // Set of data file paths for which deletion vectors exists.
+    Set<String> deletionVectors = new HashSet<>();
+
     for (Action action : actionsForVersion) {
       if (action instanceof AddFile) {
-        addedFiles.add(
+        InternalDataFile dataFile =
             actionsConverter.convertAddActionToInternalDataFile(
                 (AddFile) action,
                 snapshotAtVersion,
@@ -112,19 +119,47 @@ public class DeltaConversionSource implements 
ConversionSource<Long> {
                 tableAtVersion.getReadSchema().getFields(),
                 true,
                 DeltaPartitionExtractor.getInstance(),
-                DeltaStatsExtractor.getInstance()));
+                DeltaStatsExtractor.getInstance());
+        addedFiles.put(dataFile.getPhysicalPath(), dataFile);
+        String deleteVectorPath =
+            actionsConverter.extractDeletionVectorFile(snapshotAtVersion, 
(AddFile) action);
+        if (deleteVectorPath != null) {
+          deletionVectors.add(deleteVectorPath);
+        }
       } else if (action instanceof RemoveFile) {
-        removedFiles.add(
+        InternalDataFile dataFile =
             actionsConverter.convertRemoveActionToInternalDataFile(
                 (RemoveFile) action,
                 snapshotAtVersion,
                 fileFormat,
                 tableAtVersion.getPartitioningFields(),
-                DeltaPartitionExtractor.getInstance()));
+                DeltaPartitionExtractor.getInstance());
+        removedFiles.put(dataFile.getPhysicalPath(), dataFile);
+      }
+    }
+
+    // In Delta Lake if delete vector information is added for an existing 
data file, as a result of
+    // a delete operation, then a new RemoveFile action is added to the commit 
log to remove the old
+    // entry which is replaced by a new entry, AddFile with delete vector 
information. Since the
+    // same data file is removed and added, we need to remove it from the 
added and removed file
+    // maps which are used to track actual added and removed data files.
+    for (String deletionVector : deletionVectors) {
+      // validate that a Remove action is also added for the data file
+      if (removedFiles.containsKey(deletionVector)) {
+        addedFiles.remove(deletionVector);
+        removedFiles.remove(deletionVector);
+      } else {
+        log.warn(
+            "No Remove action found for the data file for which deletion 
vector is added {}. This is unexpected.",
+            deletionVector);
       }
     }
+
     DataFilesDiff dataFilesDiff =
-        
DataFilesDiff.builder().filesAdded(addedFiles).filesRemoved(removedFiles).build();
+        DataFilesDiff.builder()
+            .filesAdded(addedFiles.values())
+            .filesRemoved(removedFiles.values())
+            .build();
     return 
TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build();
   }
 
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
index a6f74cee..75ecce33 100644
--- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
@@ -34,15 +34,20 @@ import java.util.stream.Collectors;
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
+import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Value;
+import lombok.extern.log4j.Log4j2;
 
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.spark.sql.delta.actions.AddFile;
 
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.xtable.collectors.CustomCollectors;
 import org.apache.xtable.model.exception.ParseException;
@@ -56,6 +61,7 @@ import org.apache.xtable.model.stat.Range;
  * DeltaStatsExtractor extracts column stats and also responsible for their 
serialization leveraging
  * {@link DeltaValueConverter}.
  */
+@Log4j2
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class DeltaStatsExtractor {
   private static final Set<InternalType> FIELD_TYPES_WITH_STATS_SUPPORT =
@@ -74,9 +80,13 @@ public class DeltaStatsExtractor {
 
   private static final DeltaStatsExtractor INSTANCE = new 
DeltaStatsExtractor();
 
-  private static final String PATH_DELIMITER = "\\.";
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
+  /* this data structure collects type names of all unrecognized Delta Lake 
stats. For instance
+  data file stats in presence of delete vectors would contain 'tightBounds' 
stat which is
+  currently not handled by XTable */
+  private final Set<String> unsupportedStats = new HashSet<>();
+
   public static DeltaStatsExtractor getInstance() {
     return INSTANCE;
   }
@@ -182,6 +192,8 @@ public class DeltaStatsExtractor {
     // TODO: Additional work needed to track maps & arrays.
     try {
       DeltaStats deltaStats = MAPPER.readValue(addFile.stats(), 
DeltaStats.class);
+      collectUnsupportedStats(deltaStats.getAdditionalStats());
+
       Map<String, Object> fieldPathToMaxValue = 
flattenStatMap(deltaStats.getMaxValues());
       Map<String, Object> fieldPathToMinValue = 
flattenStatMap(deltaStats.getMinValues());
       Map<String, Object> fieldPathToNullCount = 
flattenStatMap(deltaStats.getNullCount());
@@ -211,6 +223,20 @@ public class DeltaStatsExtractor {
     }
   }
 
+  private void collectUnsupportedStats(Map<String, Object> additionalStats) {
+    if (additionalStats == null || additionalStats.isEmpty()) {
+      return;
+    }
+
+    additionalStats.keySet().stream()
+        .filter(key -> !unsupportedStats.contains(key))
+        .forEach(
+            key -> {
+              log.info("Unrecognized/unsupported Delta data file stat: {}", 
key);
+              unsupportedStats.add(key);
+            });
+  }
+
   /**
    * Takes the input map which represents a json object and flattens it.
    *
@@ -239,6 +265,17 @@ public class DeltaStatsExtractor {
     return result;
   }
 
+  /**
+   * Returns the names of all unsupported stats that have been discovered 
during the parsing of
+   * Delta Lake stats.
+   *
+   * @return set of unsupported stats
+   */
+  @VisibleForTesting
+  Set<String> getUnsupportedStats() {
+    return Collections.unmodifiableSet(unsupportedStats);
+  }
+
   @Builder
   @Value
   private static class DeltaStats {
@@ -246,6 +283,16 @@ public class DeltaStatsExtractor {
     Map<String, Object> minValues;
     Map<String, Object> maxValues;
     Map<String, Object> nullCount;
+
+    /* this is a catch-all for any additional stats that are not explicitly 
handled */
+    @JsonIgnore
+    @Getter(lazy = true)
+    Map<String, Object> additionalStats = new HashMap<>();
+
+    @JsonAnySetter
+    public void setAdditionalStat(String key, Object value) {
+      getAdditionalStats().put(key, value);
+    }
   }
 
   @Value
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
index d1d33bf8..ed02893e 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
@@ -21,6 +21,7 @@ package org.apache.xtable.delta;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.nio.file.Path;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -42,7 +43,13 @@ import scala.Option;
 
 import org.apache.xtable.GenericTable;
 import org.apache.xtable.TestSparkDeltaTable;
+import org.apache.xtable.ValidationTestHelper;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.model.CommitsBacklog;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.InternalSnapshot;
 import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.storage.TableFormat;
 
 public class ITDeltaDeleteVectorConvert {
   @TempDir private static Path tempDir;
@@ -147,18 +154,32 @@ public class ITDeltaDeleteVectorConvert {
     allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
     assertEquals(228L, testSparkDeltaTable.getNumRows());
 
-    // TODO conversion fails if delete vectors are enabled, this is because of 
missing handlers for
-    // deletion files.
-    // TODO pending for another PR
-    //    SourceTable tableConfig =
-    //        SourceTable.builder()
-    //            .name(testSparkDeltaTable.getTableName())
-    //            .basePath(testSparkDeltaTable.getBasePath())
-    //            .formatName(TableFormat.DELTA)
-    //            .build();
-    //    DeltaConversionSource conversionSource =
-    //        
conversionSourceProvider.getConversionSourceInstance(tableConfig);
-    //    InternalSnapshot internalSnapshot = 
conversionSource.getCurrentSnapshot();
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(testSparkDeltaTable.getTableName())
+            .basePath(testSparkDeltaTable.getBasePath())
+            .formatName(TableFormat.DELTA)
+            .build();
+    DeltaConversionSource conversionSource =
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
+    InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
+
+    //    validateDeltaPartitioning(internalSnapshot);
+    ValidationTestHelper.validateSnapshot(
+        internalSnapshot, allActiveFiles.get(allActiveFiles.size() - 1));
+
+    // Get changes in incremental format.
+    InstantsForIncrementalSync instantsForIncrementalSync =
+        InstantsForIncrementalSync.builder()
+            .lastSyncInstant(Instant.ofEpochMilli(timestamp1))
+            .build();
+    CommitsBacklog<Long> commitsBacklog =
+        conversionSource.getCommitsBacklog(instantsForIncrementalSync);
+    for (Long version : commitsBacklog.getCommitsToProcess()) {
+      TableChange tableChange = 
conversionSource.getTableChangeForCommit(version);
+      allTableChanges.add(tableChange);
+    }
+    ValidationTestHelper.validateTableChanges(allActiveFiles, allTableChanges);
   }
 
   private void validateDeletedRecordCount(
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
new file mode 100644
index 00000000..e62e9341
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.delta;
+
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import org.apache.spark.sql.delta.DeltaLog;
+import org.apache.spark.sql.delta.Snapshot;
+import org.apache.spark.sql.delta.actions.AddFile;
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
+
+import scala.Option;
+
+class TestDeltaActionsConverter {
+
+  @Test
+  void extractDeletionVector() throws URISyntaxException {
+    DeltaActionsConverter actionsConverter = 
DeltaActionsConverter.getInstance();
+
+    int size = 123;
+    long time = 234L;
+    boolean dataChange = true;
+    String stats = "";
+    String filePath = 
"https://container.blob.core.windows.net/tablepath/file_path";;
+    Snapshot snapshot = Mockito.mock(Snapshot.class);
+    DeltaLog deltaLog = Mockito.mock(DeltaLog.class);
+
+    DeletionVectorDescriptor deletionVector = null;
+    AddFile addFileAction =
+        new AddFile(filePath, null, size, time, dataChange, stats, null, 
deletionVector);
+    Assertions.assertNull(actionsConverter.extractDeletionVectorFile(snapshot, 
addFileAction));
+
+    deletionVector =
+        DeletionVectorDescriptor.onDiskWithAbsolutePath(
+            filePath, size, 42, Option.empty(), Option.empty());
+
+    addFileAction =
+        new AddFile(filePath, null, size, time, dataChange, stats, null, 
deletionVector);
+
+    Mockito.when(snapshot.deltaLog()).thenReturn(deltaLog);
+    Mockito.when(deltaLog.dataPath())
+        .thenReturn(new 
Path("https://container.blob.core.windows.net/tablepath";));
+    Assertions.assertEquals(
+        filePath, actionsConverter.extractDeletionVectorFile(snapshot, 
addFileAction));
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java
index dc313b67..db685883 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java
@@ -20,6 +20,7 @@ package org.apache.xtable.delta;
 
 import static org.apache.xtable.testutil.ColumnStatMapUtil.getColumnStats;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -150,10 +151,16 @@ public class TestDeltaStatsExtractor {
     deltaStats.put("maxValues", maxValues);
     deltaStats.put("nullCount", nullValues);
     deltaStats.put("numRecords", 100);
+    deltaStats.put("tightBounds", Boolean.TRUE);
+    deltaStats.put("nonExisting", minValues);
     String stats = MAPPER.writeValueAsString(deltaStats);
     AddFile addFile = new AddFile("file://path/to/file", null, 0, 0, true, 
stats, null, null);
     DeltaStatsExtractor extractor = DeltaStatsExtractor.getInstance();
     List<ColumnStat> actual = extractor.getColumnStatsForFile(addFile, fields);
+    Set<String> unsupportedStats = extractor.getUnsupportedStats();
+    assertEquals(2, unsupportedStats.size());
+    assertTrue(unsupportedStats.contains("tightBounds"));
+    assertTrue(unsupportedStats.contains("nonExisting"));
 
     List<ColumnStat> expected =
         Arrays.asList(

Reply via email to