This is an automated email from the ASF dual-hosted git repository.
ashvin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new f6767883 Fix commit log parsing of Delta tables with delete vector
f6767883 is described below
commit f6767883f8c27c4ad39e4bb54352fcdae964c11d
Author: Ashvin Agrawal <[email protected]>
AuthorDate: Sun Dec 8 21:00:09 2024 -0800
Fix commit log parsing of Delta tables with delete vector
- Correct handling of delete vectors to avoid adding data file paths to
both new and removed file
sets in `FileDiff`
- Detect new file stats in logs that are not supported by XTable and log
them. New stats do not
break parsing of the stats json. For e.g. `tightBounds` property in Delta
stats if deletion
vectors are enabled
---
.../apache/xtable/delta/DeltaActionsConverter.java | 21 +++++++
.../apache/xtable/delta/DeltaConversionSource.java | 49 +++++++++++++---
.../apache/xtable/delta/DeltaStatsExtractor.java | 49 +++++++++++++++-
.../xtable/delta/ITDeltaDeleteVectorConvert.java | 45 +++++++++++----
.../xtable/delta/TestDeltaActionsConverter.java | 67 ++++++++++++++++++++++
.../xtable/delta/TestDeltaStatsExtractor.java | 7 +++
6 files changed, 218 insertions(+), 20 deletions(-)
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
index fbee89f4..40b822df 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.actions.AddFile;
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
import org.apache.spark.sql.delta.actions.RemoveFile;
import org.apache.xtable.exception.NotSupportedException;
@@ -106,4 +107,24 @@ public class DeltaActionsConverter {
}
return tableBasePath + Path.SEPARATOR + dataFilePath;
}
+
+ /**
+ * Extracts the representation of the deletion vector information
corresponding to an AddFile
+ * action. Currently, this method extracts and returns the path to the data
file for which a
+ * deletion vector data is present.
+ *
+ * @param snapshot the commit snapshot
+ * @param addFile the add file action
+ * @return the deletion vector representation (path of data file), or null
if no deletion vector
+ * is present
+ */
+ public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) {
+ DeletionVectorDescriptor deletionVector = addFile.deletionVector();
+ if (deletionVector == null) {
+ return null;
+ }
+
+ String dataFilePath = addFile.path();
+ return getFullPathToFile(snapshot, dataFilePath);
+ }
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
index 19ecc02c..a5937b02 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
@@ -21,8 +21,10 @@ package org.apache.xtable.delta;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -99,11 +101,16 @@ public class DeltaConversionSource implements
ConversionSource<Long> {
Snapshot snapshotAtVersion = deltaLog.getSnapshotAt(versionNumber,
Option.empty());
FileFormat fileFormat =
actionsConverter.convertToFileFormat(snapshotAtVersion.metadata().format().provider());
- Set<InternalDataFile> addedFiles = new HashSet<>();
- Set<InternalDataFile> removedFiles = new HashSet<>();
+
+ // All 3 of the following data structures use data file's absolute path as
the key
+ Map<String, InternalDataFile> addedFiles = new HashMap<>();
+ Map<String, InternalDataFile> removedFiles = new HashMap<>();
+ // Set of data file paths for which deletion vectors exists.
+ Set<String> deletionVectors = new HashSet<>();
+
for (Action action : actionsForVersion) {
if (action instanceof AddFile) {
- addedFiles.add(
+ InternalDataFile dataFile =
actionsConverter.convertAddActionToInternalDataFile(
(AddFile) action,
snapshotAtVersion,
@@ -112,19 +119,47 @@ public class DeltaConversionSource implements
ConversionSource<Long> {
tableAtVersion.getReadSchema().getFields(),
true,
DeltaPartitionExtractor.getInstance(),
- DeltaStatsExtractor.getInstance()));
+ DeltaStatsExtractor.getInstance());
+ addedFiles.put(dataFile.getPhysicalPath(), dataFile);
+ String deleteVectorPath =
+ actionsConverter.extractDeletionVectorFile(snapshotAtVersion,
(AddFile) action);
+ if (deleteVectorPath != null) {
+ deletionVectors.add(deleteVectorPath);
+ }
} else if (action instanceof RemoveFile) {
- removedFiles.add(
+ InternalDataFile dataFile =
actionsConverter.convertRemoveActionToInternalDataFile(
(RemoveFile) action,
snapshotAtVersion,
fileFormat,
tableAtVersion.getPartitioningFields(),
- DeltaPartitionExtractor.getInstance()));
+ DeltaPartitionExtractor.getInstance());
+ removedFiles.put(dataFile.getPhysicalPath(), dataFile);
+ }
+ }
+
+ // In Delta Lake if delete vector information is added for an existing
data file, as a result of
+ // a delete operation, then a new RemoveFile action is added to the commit
log to remove the old
+ // entry which is replaced by a new entry, AddFile with delete vector
information. Since the
+ // same data file is removed and added, we need to remove it from the
added and removed file
+ // maps which are used to track actual added and removed data files.
+ for (String deletionVector : deletionVectors) {
+ // validate that a Remove action is also added for the data file
+ if (removedFiles.containsKey(deletionVector)) {
+ addedFiles.remove(deletionVector);
+ removedFiles.remove(deletionVector);
+ } else {
+ log.warn(
+ "No Remove action found for the data file for which deletion
vector is added {}. This is unexpected.",
+ deletionVector);
}
}
+
DataFilesDiff dataFilesDiff =
-
DataFilesDiff.builder().filesAdded(addedFiles).filesRemoved(removedFiles).build();
+ DataFilesDiff.builder()
+ .filesAdded(addedFiles.values())
+ .filesRemoved(removedFiles.values())
+ .build();
return
TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build();
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
index a6f74cee..75ecce33 100644
--- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
@@ -34,15 +34,20 @@ import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
+import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Value;
+import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.delta.actions.AddFile;
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.model.exception.ParseException;
@@ -56,6 +61,7 @@ import org.apache.xtable.model.stat.Range;
* DeltaStatsExtractor extracts column stats and also responsible for their
serialization leveraging
* {@link DeltaValueConverter}.
*/
+@Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DeltaStatsExtractor {
private static final Set<InternalType> FIELD_TYPES_WITH_STATS_SUPPORT =
@@ -74,9 +80,13 @@ public class DeltaStatsExtractor {
private static final DeltaStatsExtractor INSTANCE = new
DeltaStatsExtractor();
- private static final String PATH_DELIMITER = "\\.";
private static final ObjectMapper MAPPER = new ObjectMapper();
+ /* this data structure collects type names of all unrecognized Delta Lake
stats. For instance
+ data file stats in presence of delete vectors would contain 'tightBounds'
stat which is
+ currently not handled by XTable */
+ private final Set<String> unsupportedStats = new HashSet<>();
+
public static DeltaStatsExtractor getInstance() {
return INSTANCE;
}
@@ -182,6 +192,8 @@ public class DeltaStatsExtractor {
// TODO: Additional work needed to track maps & arrays.
try {
DeltaStats deltaStats = MAPPER.readValue(addFile.stats(),
DeltaStats.class);
+ collectUnsupportedStats(deltaStats.getAdditionalStats());
+
Map<String, Object> fieldPathToMaxValue =
flattenStatMap(deltaStats.getMaxValues());
Map<String, Object> fieldPathToMinValue =
flattenStatMap(deltaStats.getMinValues());
Map<String, Object> fieldPathToNullCount =
flattenStatMap(deltaStats.getNullCount());
@@ -211,6 +223,20 @@ public class DeltaStatsExtractor {
}
}
+ private void collectUnsupportedStats(Map<String, Object> additionalStats) {
+ if (additionalStats == null || additionalStats.isEmpty()) {
+ return;
+ }
+
+ additionalStats.keySet().stream()
+ .filter(key -> !unsupportedStats.contains(key))
+ .forEach(
+ key -> {
+ log.info("Unrecognized/unsupported Delta data file stat: {}",
key);
+ unsupportedStats.add(key);
+ });
+ }
+
/**
* Takes the input map which represents a json object and flattens it.
*
@@ -239,6 +265,17 @@ public class DeltaStatsExtractor {
return result;
}
+ /**
+ * Returns the names of all unsupported stats that have been discovered
during the parsing of
+ * Delta Lake stats.
+ *
+ * @return set of unsupported stats
+ */
+ @VisibleForTesting
+ Set<String> getUnsupportedStats() {
+ return Collections.unmodifiableSet(unsupportedStats);
+ }
+
@Builder
@Value
private static class DeltaStats {
@@ -246,6 +283,16 @@ public class DeltaStatsExtractor {
Map<String, Object> minValues;
Map<String, Object> maxValues;
Map<String, Object> nullCount;
+
+ /* this is a catch-all for any additional stats that are not explicitly
handled */
+ @JsonIgnore
+ @Getter(lazy = true)
+ Map<String, Object> additionalStats = new HashMap<>();
+
+ @JsonAnySetter
+ public void setAdditionalStat(String key, Object value) {
+ getAdditionalStats().put(key, value);
+ }
}
@Value
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
index d1d33bf8..ed02893e 100644
---
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
+++
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
@@ -21,6 +21,7 @@ package org.apache.xtable.delta;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.nio.file.Path;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -42,7 +43,13 @@ import scala.Option;
import org.apache.xtable.GenericTable;
import org.apache.xtable.TestSparkDeltaTable;
+import org.apache.xtable.ValidationTestHelper;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.model.CommitsBacklog;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.storage.TableFormat;
public class ITDeltaDeleteVectorConvert {
@TempDir private static Path tempDir;
@@ -147,18 +154,32 @@ public class ITDeltaDeleteVectorConvert {
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
assertEquals(228L, testSparkDeltaTable.getNumRows());
- // TODO conversion fails if delete vectors are enabled, this is because of
missing handlers for
- // deletion files.
- // TODO pending for another PR
- // SourceTable tableConfig =
- // SourceTable.builder()
- // .name(testSparkDeltaTable.getTableName())
- // .basePath(testSparkDeltaTable.getBasePath())
- // .formatName(TableFormat.DELTA)
- // .build();
- // DeltaConversionSource conversionSource =
- //
conversionSourceProvider.getConversionSourceInstance(tableConfig);
- // InternalSnapshot internalSnapshot =
conversionSource.getCurrentSnapshot();
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(testSparkDeltaTable.getTableName())
+ .basePath(testSparkDeltaTable.getBasePath())
+ .formatName(TableFormat.DELTA)
+ .build();
+ DeltaConversionSource conversionSource =
+ conversionSourceProvider.getConversionSourceInstance(tableConfig);
+ InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
+
+ // validateDeltaPartitioning(internalSnapshot);
+ ValidationTestHelper.validateSnapshot(
+ internalSnapshot, allActiveFiles.get(allActiveFiles.size() - 1));
+
+ // Get changes in incremental format.
+ InstantsForIncrementalSync instantsForIncrementalSync =
+ InstantsForIncrementalSync.builder()
+ .lastSyncInstant(Instant.ofEpochMilli(timestamp1))
+ .build();
+ CommitsBacklog<Long> commitsBacklog =
+ conversionSource.getCommitsBacklog(instantsForIncrementalSync);
+ for (Long version : commitsBacklog.getCommitsToProcess()) {
+ TableChange tableChange =
conversionSource.getTableChangeForCommit(version);
+ allTableChanges.add(tableChange);
+ }
+ ValidationTestHelper.validateTableChanges(allActiveFiles, allTableChanges);
}
private void validateDeletedRecordCount(
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
new file mode 100644
index 00000000..e62e9341
--- /dev/null
+++
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.delta;
+
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import org.apache.spark.sql.delta.DeltaLog;
+import org.apache.spark.sql.delta.Snapshot;
+import org.apache.spark.sql.delta.actions.AddFile;
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
+
+import scala.Option;
+
+class TestDeltaActionsConverter {
+
+ @Test
+ void extractDeletionVector() throws URISyntaxException {
+ DeltaActionsConverter actionsConverter =
DeltaActionsConverter.getInstance();
+
+ int size = 123;
+ long time = 234L;
+ boolean dataChange = true;
+ String stats = "";
+ String filePath =
"https://container.blob.core.windows.net/tablepath/file_path";
+ Snapshot snapshot = Mockito.mock(Snapshot.class);
+ DeltaLog deltaLog = Mockito.mock(DeltaLog.class);
+
+ DeletionVectorDescriptor deletionVector = null;
+ AddFile addFileAction =
+ new AddFile(filePath, null, size, time, dataChange, stats, null,
deletionVector);
+ Assertions.assertNull(actionsConverter.extractDeletionVectorFile(snapshot,
addFileAction));
+
+ deletionVector =
+ DeletionVectorDescriptor.onDiskWithAbsolutePath(
+ filePath, size, 42, Option.empty(), Option.empty());
+
+ addFileAction =
+ new AddFile(filePath, null, size, time, dataChange, stats, null,
deletionVector);
+
+ Mockito.when(snapshot.deltaLog()).thenReturn(deltaLog);
+ Mockito.when(deltaLog.dataPath())
+ .thenReturn(new
Path("https://container.blob.core.windows.net/tablepath"));
+ Assertions.assertEquals(
+ filePath, actionsConverter.extractDeletionVectorFile(snapshot,
addFileAction));
+ }
+}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java
index dc313b67..db685883 100644
---
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java
@@ -20,6 +20,7 @@ package org.apache.xtable.delta;
import static org.apache.xtable.testutil.ColumnStatMapUtil.getColumnStats;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.Arrays;
@@ -150,10 +151,16 @@ public class TestDeltaStatsExtractor {
deltaStats.put("maxValues", maxValues);
deltaStats.put("nullCount", nullValues);
deltaStats.put("numRecords", 100);
+ deltaStats.put("tightBounds", Boolean.TRUE);
+ deltaStats.put("nonExisting", minValues);
String stats = MAPPER.writeValueAsString(deltaStats);
AddFile addFile = new AddFile("file://path/to/file", null, 0, 0, true,
stats, null, null);
DeltaStatsExtractor extractor = DeltaStatsExtractor.getInstance();
List<ColumnStat> actual = extractor.getColumnStatsForFile(addFile, fields);
+ Set<String> unsupportedStats = extractor.getUnsupportedStats();
+ assertEquals(2, unsupportedStats.size());
+ assertTrue(unsupportedStats.contains("tightBounds"));
+ assertTrue(unsupportedStats.contains("nonExisting"));
List<ColumnStat> expected =
Arrays.asList(