This is an automated email from the ASF dual-hosted git repository.
ashvin pushed a commit to branch 595-detect-delete-vectors-files
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to
refs/heads/595-detect-delete-vectors-files by this push:
new b190e666 Fix commit log parsing of Delta tables with delete vector
b190e666 is described below
commit b190e666a2b2e0cf165fb1a248baad6002e5b906
Author: Ashvin Agrawal <[email protected]>
AuthorDate: Sun Dec 8 21:00:09 2024 -0800
Fix commit log parsing of Delta tables with delete vector
- Add support for `tightBounds` property in Delta stats representation
- Correct handling of delete vectors to avoid adding data file paths to
both new and removed file sets in `FileDiff`
---
.../apache/xtable/delta/DeltaActionsConverter.java | 21 +++++++
.../apache/xtable/delta/DeltaConversionSource.java | 49 +++++++++++++---
.../apache/xtable/delta/DeltaStatsExtractor.java | 1 +
.../xtable/delta/ITDeltaDeleteVectorConvert.java | 45 +++++++++++----
.../xtable/delta/TestDeltaActionsConverter.java | 66 ++++++++++++++++++++++
5 files changed, 163 insertions(+), 19 deletions(-)
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
index fbee89f4..40b822df 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.actions.AddFile;
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
import org.apache.spark.sql.delta.actions.RemoveFile;
import org.apache.xtable.exception.NotSupportedException;
@@ -106,4 +107,24 @@ public class DeltaActionsConverter {
}
return tableBasePath + Path.SEPARATOR + dataFilePath;
}
+
+ /**
+ * Extracts the representation of the deletion vector information
corresponding to an AddFile
+ * action. Currently, this method extracts and returns the path to the data
file for which a
+ * deletion vector data is present.
+ *
+ * @param snapshot the commit snapshot
+ * @param addFile the add file action
+ * @return the deletion vector representation (path of data file), or null
if no deletion vector
+ * is present
+ */
+ public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) {
+ DeletionVectorDescriptor deletionVector = addFile.deletionVector();
+ if (deletionVector == null) {
+ return null;
+ }
+
+ String dataFilePath = addFile.path();
+ return getFullPathToFile(snapshot, dataFilePath);
+ }
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
index 19ecc02c..ef9c47d0 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
@@ -21,8 +21,10 @@ package org.apache.xtable.delta;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -99,11 +101,16 @@ public class DeltaConversionSource implements
ConversionSource<Long> {
Snapshot snapshotAtVersion = deltaLog.getSnapshotAt(versionNumber,
Option.empty());
FileFormat fileFormat =
actionsConverter.convertToFileFormat(snapshotAtVersion.metadata().format().provider());
- Set<InternalDataFile> addedFiles = new HashSet<>();
- Set<InternalDataFile> removedFiles = new HashSet<>();
+
+ // All 3 of the following maps use data file path as the key
+ Map<String, InternalDataFile> addedFiles = new HashMap<>();
+ Map<String, InternalDataFile> removedFiles = new HashMap<>();
+ // Set of data file paths for which deletion vectors exists.
+ Set<String> deletionVectors = new HashSet<>();
+
for (Action action : actionsForVersion) {
if (action instanceof AddFile) {
- addedFiles.add(
+ InternalDataFile dataFile =
actionsConverter.convertAddActionToInternalDataFile(
(AddFile) action,
snapshotAtVersion,
@@ -112,19 +119,47 @@ public class DeltaConversionSource implements
ConversionSource<Long> {
tableAtVersion.getReadSchema().getFields(),
true,
DeltaPartitionExtractor.getInstance(),
- DeltaStatsExtractor.getInstance()));
+ DeltaStatsExtractor.getInstance());
+ addedFiles.put(dataFile.getPhysicalPath(), dataFile);
+ String deleteVectorPath =
+ actionsConverter.extractDeletionVectorFile(snapshotAtVersion,
(AddFile) action);
+ if (deleteVectorPath != null) {
+ deletionVectors.add(deleteVectorPath);
+ }
} else if (action instanceof RemoveFile) {
- removedFiles.add(
+ InternalDataFile dataFile =
actionsConverter.convertRemoveActionToInternalDataFile(
(RemoveFile) action,
snapshotAtVersion,
fileFormat,
tableAtVersion.getPartitioningFields(),
- DeltaPartitionExtractor.getInstance()));
+ DeltaPartitionExtractor.getInstance());
+ removedFiles.put(dataFile.getPhysicalPath(), dataFile);
+ }
+ }
+
+ // In Delta Lake if delete vector information is added for an existing
data file, as a result of
+ // a delete operation, then a new RemoveFile action is added to the commit
log to remove the old
+ // entry which is replaced by a new entry, AddFile with delete vector
information. Since the
+ // same data file is removed and added, we need to remove it from the
added and removed file
+ // maps which are used to track actual added and removed data files.
+ for (String deletionVector : deletionVectors) {
+ // validate that a Remove action is also added for the data file
+ if (removedFiles.containsKey(deletionVector)) {
+ addedFiles.remove(deletionVector);
+ removedFiles.remove(deletionVector);
+ } else {
+ log.warn(
+ "No Remove action found for the data file for which deletion
vector is added {}. This is unexpected.",
+ deletionVector);
}
}
+
DataFilesDiff dataFilesDiff =
-
DataFilesDiff.builder().filesAdded(addedFiles).filesRemoved(removedFiles).build();
+ DataFilesDiff.builder()
+ .filesAdded(addedFiles.values())
+ .filesRemoved(removedFiles.values())
+ .build();
return
TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build();
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
index a6f74cee..d5d919b6 100644
--- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
@@ -246,6 +246,7 @@ public class DeltaStatsExtractor {
Map<String, Object> minValues;
Map<String, Object> maxValues;
Map<String, Object> nullCount;
+ boolean tightBounds;
}
@Value
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
index d1d33bf8..ed02893e 100644
---
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
+++
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
@@ -21,6 +21,7 @@ package org.apache.xtable.delta;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.nio.file.Path;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -42,7 +43,13 @@ import scala.Option;
import org.apache.xtable.GenericTable;
import org.apache.xtable.TestSparkDeltaTable;
+import org.apache.xtable.ValidationTestHelper;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.model.CommitsBacklog;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.storage.TableFormat;
public class ITDeltaDeleteVectorConvert {
@TempDir private static Path tempDir;
@@ -147,18 +154,32 @@ public class ITDeltaDeleteVectorConvert {
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
assertEquals(228L, testSparkDeltaTable.getNumRows());
- // TODO conversion fails if delete vectors are enabled, this is because of
missing handlers for
- // deletion files.
- // TODO pending for another PR
- // SourceTable tableConfig =
- // SourceTable.builder()
- // .name(testSparkDeltaTable.getTableName())
- // .basePath(testSparkDeltaTable.getBasePath())
- // .formatName(TableFormat.DELTA)
- // .build();
- // DeltaConversionSource conversionSource =
- //
conversionSourceProvider.getConversionSourceInstance(tableConfig);
- // InternalSnapshot internalSnapshot =
conversionSource.getCurrentSnapshot();
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(testSparkDeltaTable.getTableName())
+ .basePath(testSparkDeltaTable.getBasePath())
+ .formatName(TableFormat.DELTA)
+ .build();
+ DeltaConversionSource conversionSource =
+ conversionSourceProvider.getConversionSourceInstance(tableConfig);
+ InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
+
+ // validateDeltaPartitioning(internalSnapshot);
+ ValidationTestHelper.validateSnapshot(
+ internalSnapshot, allActiveFiles.get(allActiveFiles.size() - 1));
+
+ // Get changes in incremental format.
+ InstantsForIncrementalSync instantsForIncrementalSync =
+ InstantsForIncrementalSync.builder()
+ .lastSyncInstant(Instant.ofEpochMilli(timestamp1))
+ .build();
+ CommitsBacklog<Long> commitsBacklog =
+ conversionSource.getCommitsBacklog(instantsForIncrementalSync);
+ for (Long version : commitsBacklog.getCommitsToProcess()) {
+ TableChange tableChange =
conversionSource.getTableChangeForCommit(version);
+ allTableChanges.add(tableChange);
+ }
+ ValidationTestHelper.validateTableChanges(allActiveFiles, allTableChanges);
}
private void validateDeletedRecordCount(
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
new file mode 100644
index 00000000..c8e34b77
--- /dev/null
+++
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.delta;
+
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import org.apache.spark.sql.delta.DeltaLog;
+import org.apache.spark.sql.delta.Snapshot;
+import org.apache.spark.sql.delta.actions.AddFile;
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
+
+import scala.Option;
+
+class TestDeltaActionsConverter {
+
+ @Test
+ void extractDeletionVector() throws URISyntaxException {
+ DeltaActionsConverter actionsConverter =
DeltaActionsConverter.getInstance();
+
+ int size = 123;
+ long time = 234L;
+ boolean dataChange = true;
+ String stats = "";
+ String filePath = "file:///file_path";
+ Snapshot snapshot = Mockito.mock(Snapshot.class);
+ DeltaLog deltaLog = Mockito.mock(DeltaLog.class);
+
+ DeletionVectorDescriptor deletionVector = null;
+ AddFile addFileAction =
+ new AddFile(filePath, null, size, time, dataChange, stats, null,
deletionVector);
+ Assertions.assertNull(actionsConverter.extractDeletionVectorFile(snapshot,
addFileAction));
+
+ deletionVector =
+ DeletionVectorDescriptor.onDiskWithAbsolutePath(
+ filePath, size, 42, Option.empty(), Option.empty());
+
+ addFileAction =
+ new AddFile(filePath, null, size, time, dataChange, stats, null,
deletionVector);
+
+ Mockito.when(snapshot.deltaLog()).thenReturn(deltaLog);
+ Mockito.when(deltaLog.dataPath()).thenReturn(new Path("file:///"));
+ Assertions.assertEquals(
+ filePath, actionsConverter.extractDeletionVectorFile(snapshot,
addFileAction));
+ }
+}