This is an automated email from the ASF dual-hosted git repository.
ashvin pushed a commit to branch 588-delta-table-delete-rows-tests
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to
refs/heads/588-delta-table-delete-rows-tests by this push:
new 3b67fae5 Address review comments, validate deletion records
3b67fae5 is described below
commit 3b67fae52f0605daebf9c40d6dc688388ecd4e9d
Author: Ashvin Agrawal <[email protected]>
AuthorDate: Sun Dec 8 10:21:48 2024 -0800
Address review comments, validate deletion records
---
.../xtable/delta/ITDeltaDeleteVectorConvert.java | 37 ++++++++++++++++++----
1 file changed, 30 insertions(+), 7 deletions(-)
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
index d0564dda..d1d33bf8 100644
---
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
+++
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
@@ -35,6 +35,11 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.apache.spark.sql.delta.DeltaLog;
+import org.apache.spark.sql.delta.actions.AddFile;
+
+import scala.Option;
+
import org.apache.xtable.GenericTable;
import org.apache.xtable.TestSparkDeltaTable;
import org.apache.xtable.model.TableChange;
@@ -85,12 +90,6 @@ public class ITDeltaDeleteVectorConvert {
TestSparkDeltaTable testSparkDeltaTable =
new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false);
- List<List<String>> allActiveFiles = new ArrayList<>();
- List<TableChange> allTableChanges = new ArrayList<>();
- List<Row> rows = testSparkDeltaTable.insertRows(50);
- Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp();
- allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
-
// enable deletion vectors for the test table
testSparkDeltaTable
.getSparkSession()
@@ -99,14 +98,22 @@ public class ITDeltaDeleteVectorConvert {
+ tableName
+ " SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)");
+ List<List<String>> allActiveFiles = new ArrayList<>();
+ List<TableChange> allTableChanges = new ArrayList<>();
+ List<Row> rows = testSparkDeltaTable.insertRows(50);
+ Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp();
+ allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+
List<Row> rows1 = testSparkDeltaTable.insertRows(50);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
assertEquals(100L, testSparkDeltaTable.getNumRows());
+ validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(),
allActiveFiles.size() + 1, 0, 0);
// upsert does not create delete vectors
testSparkDeltaTable.upsertRows(rows.subList(0, 20));
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
assertEquals(100L, testSparkDeltaTable.getNumRows());
+ validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(),
allActiveFiles.size() + 1, 0, 0);
testSparkDeltaTable.insertRows(50);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
@@ -121,6 +128,7 @@ public class ITDeltaDeleteVectorConvert {
testSparkDeltaTable.deleteRows(rowsToDelete);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
assertEquals(135L, testSparkDeltaTable.getNumRows());
+ validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(),
allActiveFiles.size() + 1, 2, 15);
testSparkDeltaTable.insertRows(50);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
@@ -133,6 +141,7 @@ public class ITDeltaDeleteVectorConvert {
testSparkDeltaTable.deleteRows(rowsToDelete);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
assertEquals(178L, testSparkDeltaTable.getNumRows());
+ validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(),
allActiveFiles.size() + 1, 2, 22);
testSparkDeltaTable.insertRows(50);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
@@ -150,6 +159,20 @@ public class ITDeltaDeleteVectorConvert {
// DeltaConversionSource conversionSource =
//
conversionSourceProvider.getConversionSourceInstance(tableConfig);
// InternalSnapshot internalSnapshot =
conversionSource.getCurrentSnapshot();
- //
+ }
+
+ private void validateDeletedRecordCount(
+ DeltaLog deltaLog, int version, int deleteVectorFileCount, int
deletionRecordCount) {
+ List<AddFile> allFiles =
+ deltaLog.getSnapshotAt(version,
Option.empty()).allFiles().collectAsList();
+ List<AddFile> filesWithDeletionVectors =
+ allFiles.stream().filter(f -> f.deletionVector() !=
null).collect(Collectors.toList());
+
+ assertEquals(deleteVectorFileCount, filesWithDeletionVectors.size());
+ assertEquals(
+ deletionRecordCount,
+ filesWithDeletionVectors.stream()
+ .collect(Collectors.summarizingLong(AddFile::numDeletedRecords))
+ .getSum());
}
}