This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d14298fe9e Spark 3.4: Test metadata tables with format-version=v3 /
add ExtensionsTestBase (#12600)
d14298fe9e is described below
commit d14298fe9e826f67e0a2c4d8e279c6b032912726
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Fri Mar 21 13:18:54 2025 +0100
Spark 3.4: Test metadata tables with format-version=v3 / add
ExtensionsTestBase (#12600)
this backports #12135 to Spark 3.4 and also includes changes around
switching from JUnit4 to JUnit5
---
spark/v3.4/build.gradle | 4 +
.../spark/extensions/ExtensionsTestBase.java | 70 +++
.../spark/extensions/TestMergeOnReadUpdate.java | 3 +-
.../spark/extensions/TestMetadataTables.java | 480 ++++++++++++++++-----
.../apache/iceberg/spark/source/DVIterator.java | 3 +
.../org/apache/iceberg/spark/CatalogTestBase.java | 2 +-
6 files changed, 444 insertions(+), 118 deletions(-)
diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle
index 8fd500ac64..bf28635f3c 100644
--- a/spark/v3.4/build.gradle
+++ b/spark/v3.4/build.gradle
@@ -192,6 +192,10 @@
project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
antlr libs.antlr.antlr4
}
+ test {
+ useJUnitPlatform()
+ }
+
generateGrammarSource {
maxHeapSize = "64m"
arguments += ['-visitor', '-package',
'org.apache.spark.sql.catalyst.parser.extensions']
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
new file mode 100644
index 0000000000..578845e3da
--- /dev/null
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.iceberg.spark.TestBase;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.jupiter.api.BeforeAll;
+
+public abstract class ExtensionsTestBase extends CatalogTestBase {
+
+ private static final Random RANDOM = ThreadLocalRandom.current();
+
+ @BeforeAll
+ public static void startMetastoreAndSpark() {
+ TestBase.metastore = new TestHiveMetastore();
+ metastore.start();
+ TestBase.hiveConf = metastore.hiveConf();
+
+ TestBase.spark.close();
+
+ TestBase.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.testing", "true")
+ .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
+ .config("spark.sql.extensions",
IcebergSparkSessionExtensions.class.getName())
+ .config("spark.hadoop." + METASTOREURIS.varname,
hiveConf.get(METASTOREURIS.varname))
+ .config("spark.sql.shuffle.partitions", "4")
+
.config("spark.sql.hive.metastorePartitionPruningFallbackOnException", "true")
+
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
+ .config(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
String.valueOf(RANDOM.nextBoolean()))
+ .enableHiveSupport()
+ .getOrCreate();
+
+ TestBase.sparkContext =
JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ TestBase.catalog =
+ (HiveCatalog)
+ CatalogUtil.loadCatalog(
+ HiveCatalog.class.getName(), "hive", ImmutableMap.of(),
hiveConf);
+ }
+}
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
index 36a2644729..abfaa0e587 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
@@ -39,7 +39,6 @@ import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.junit.Test;
-import org.junit.jupiter.api.TestTemplate;
public class TestMergeOnReadUpdate extends TestUpdate {
@@ -160,7 +159,7 @@ public class TestMergeOnReadUpdate extends TestUpdate {
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
}
- @TestTemplate
+ @Test
public void testUpdateWithDVAndHistoricalPositionDeletes() {
assumeThat(formatVersion).isEqualTo(2);
createTableWithDeleteGranularity(
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
index 37d590f974..3d2bf75e70 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -18,19 +18,29 @@
*/
package org.apache.iceberg.spark.extensions;
+import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
+import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_REST;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericData.Record;
import org.apache.commons.collections.ListUtils;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -39,8 +49,11 @@ import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
@@ -51,27 +64,74 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.types.StructType;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestMetadataTables extends SparkExtensionsTestBase {
-
- public TestMetadataTables(String catalogName, String implementation,
Map<String, String> config) {
- super(catalogName, implementation, config);
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestMetadataTables extends ExtensionsTestBase {
+ @Parameter(index = 3)
+ private int formatVersion;
+
+ @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2},
formatVersion = {3}")
+ protected static Object[][] parameters() {
+ return new Object[][] {
+ {
+ SparkCatalogConfig.HIVE.catalogName(),
+ SparkCatalogConfig.HIVE.implementation(),
+ SparkCatalogConfig.HIVE.properties(),
+ 2
+ },
+ {
+ SparkCatalogConfig.HADOOP.catalogName(),
+ SparkCatalogConfig.HADOOP.implementation(),
+ SparkCatalogConfig.HADOOP.properties(),
+ 2
+ },
+ {
+ SparkCatalogConfig.SPARK.catalogName(),
+ SparkCatalogConfig.SPARK.implementation(),
+ SparkCatalogConfig.SPARK.properties(),
+ 2
+ },
+ {
+ SparkCatalogConfig.SPARK.catalogName(),
+ SparkCatalogConfig.SPARK.implementation(),
+ SparkCatalogConfig.SPARK.properties(),
+ 3
+ },
+ {
+ SparkCatalogConfig.REST.catalogName(),
+ SparkCatalogConfig.REST.implementation(),
+ ImmutableMap.builder()
+ .putAll(SparkCatalogConfig.REST.properties())
+ .put(CatalogProperties.URI,
restCatalog.properties().get(CatalogProperties.URI))
+ .build(),
+ 2
+ },
+ {
+ SparkCatalogConfig.REST.catalogName(),
+ SparkCatalogConfig.REST.implementation(),
+ ImmutableMap.builder()
+ .putAll(SparkCatalogConfig.REST.properties())
+ .put(CatalogProperties.URI,
restCatalog.properties().get(CatalogProperties.URI))
+ .build(),
+ 3
+ }
+ };
}
- @After
+ @AfterEach
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
}
- @Test
+ @TestTemplate
public void testUnpartitionedTable() throws Exception {
sql(
"CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
- + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
- tableName);
+ + "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
+ tableName, formatVersion);
List<SimpleRecord> records =
Lists.newArrayList(
@@ -90,8 +150,8 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Table table = Spark3Util.loadIcebergTable(spark, tableName);
List<ManifestFile> expectedDataManifests =
TestHelpers.dataManifests(table);
List<ManifestFile> expectedDeleteManifests =
TestHelpers.deleteManifests(table);
- Assert.assertEquals("Should have 1 data manifest", 1,
expectedDataManifests.size());
- Assert.assertEquals("Should have 1 delete manifest", 1,
expectedDeleteManifests.size());
+ assertThat(expectedDataManifests).as("Should have 1 data
manifest").hasSize(1);
+ assertThat(expectedDeleteManifests).as("Should have 1 delete
manifest").hasSize(1);
Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName +
".entries").schema();
Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName +
".files").schema();
@@ -99,13 +159,12 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
// check delete files table
Dataset<Row> actualDeleteFilesDs = spark.sql("SELECT * FROM " + tableName
+ ".delete_files");
List<Row> actualDeleteFiles =
TestHelpers.selectNonDerived(actualDeleteFilesDs).collectAsList();
- Assert.assertEquals(
- "Metadata table should return one delete file", 1,
actualDeleteFiles.size());
+ assertThat(actualDeleteFiles).as("Metadata table should return one delete
file").hasSize(1);
List<Record> expectedDeleteFiles =
expectedEntries(
table, FileContent.POSITION_DELETES, entriesTableSchema,
expectedDeleteManifests, null);
- Assert.assertEquals("Should be one delete file manifest entry", 1,
expectedDeleteFiles.size());
+ assertThat(expectedDeleteFiles).as("Should be one delete file manifest
entry").hasSize(1);
TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
expectedDeleteFiles.get(0),
@@ -114,11 +173,11 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
// check data files table
Dataset<Row> actualDataFilesDs = spark.sql("SELECT * FROM " + tableName +
".data_files");
List<Row> actualDataFiles =
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
- Assert.assertEquals("Metadata table should return one data file", 1,
actualDataFiles.size());
+ assertThat(actualDataFiles).as("Metadata table should return one data
file").hasSize(1);
List<Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, null);
- Assert.assertEquals("Should be one data file manifest entry", 1,
expectedDataFiles.size());
+ assertThat(expectedDataFiles).as("Should be one data file manifest
entry").hasSize(1);
TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualDataFilesDs),
expectedDataFiles.get(0),
@@ -129,27 +188,87 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
spark.sql("SELECT * FROM " + tableName + ".files ORDER BY content");
List<Row> actualFiles =
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
- Assert.assertEquals("Metadata table should return two files", 2,
actualFiles.size());
+ assertThat(actualFiles).as("Metadata table should return two
files").hasSize(2);
List<Record> expectedFiles =
Stream.concat(expectedDataFiles.stream(), expectedDeleteFiles.stream())
.collect(Collectors.toList());
- Assert.assertEquals("Should have two files manifest entries", 2,
expectedFiles.size());
+ assertThat(expectedFiles).as("Should have two files manifest
entries").hasSize(2);
TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0),
actualFiles.get(0));
TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1),
actualFiles.get(1));
}
- @Test
+ @TestTemplate
+ public void testPositionDeletesTable() throws Exception {
+ sql(
+ "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+ + "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
+ tableName, formatVersion);
+
+ List<SimpleRecord> records =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c"),
+ new SimpleRecord(4, "d"));
+ spark
+ .createDataset(records, Encoders.bean(SimpleRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+ sql("DELETE FROM %s WHERE id=1 OR id=3", tableName);
+
+ // check delete files table
+ assertThat(sql("SELECT * FROM %s.delete_files", tableName)).hasSize(1);
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ DataFile dataFile = Iterables.getOnlyElement(TestHelpers.dataFiles(table));
+ DeleteFile deleteFile =
Iterables.getOnlyElement(TestHelpers.deleteFiles(table));
+
+ List<Object[]> expectedRows;
+ if (formatVersion >= 3) {
+ expectedRows =
+ ImmutableList.of(
+ row(
+ dataFile.location(),
+ 0L,
+ null,
+ dataFile.specId(),
+ deleteFile.location(),
+ deleteFile.contentOffset(),
+ deleteFile.contentSizeInBytes()),
+ row(
+ dataFile.location(),
+ 2L,
+ null,
+ dataFile.specId(),
+ deleteFile.location(),
+ deleteFile.contentOffset(),
+ deleteFile.contentSizeInBytes()));
+ } else {
+ expectedRows =
+ ImmutableList.of(
+ row(dataFile.location(), 0L, null, dataFile.specId(),
deleteFile.location()),
+ row(dataFile.location(), 2L, null, dataFile.specId(),
deleteFile.location()));
+ }
+
+ // check position_deletes table
+ assertThat(sql("SELECT * FROM %s.position_deletes", tableName))
+ .hasSize(2)
+ .containsExactlyElementsOf(expectedRows);
+ }
+
+ @TestTemplate
public void testPartitionedTable() throws Exception {
sql(
"CREATE TABLE %s (id bigint, data string) "
+ "USING iceberg "
+ "PARTITIONED BY (data) "
+ "TBLPROPERTIES"
- + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
- tableName);
+ + "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
+ tableName, formatVersion);
List<SimpleRecord> recordsA =
Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
@@ -175,8 +294,8 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
List<ManifestFile> expectedDataManifests =
TestHelpers.dataManifests(table);
List<ManifestFile> expectedDeleteManifests =
TestHelpers.deleteManifests(table);
- Assert.assertEquals("Should have 2 data manifests", 2,
expectedDataManifests.size());
- Assert.assertEquals("Should have 2 delete manifests", 2,
expectedDeleteManifests.size());
+ assertThat(expectedDataManifests).as("Should have 2 data
manifest").hasSize(2);
+ assertThat(expectedDeleteManifests).as("Should have 2 delete
manifest").hasSize(2);
Schema filesTableSchema =
Spark3Util.loadIcebergTable(spark, tableName +
".delete_files").schema();
@@ -185,15 +304,13 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
List<Record> expectedDeleteFiles =
expectedEntries(
table, FileContent.POSITION_DELETES, entriesTableSchema,
expectedDeleteManifests, "a");
- Assert.assertEquals(
- "Should have one delete file manifest entry", 1,
expectedDeleteFiles.size());
+ assertThat(expectedDeleteFiles).as("Should have one delete file manifest
entry").hasSize(1);
Dataset<Row> actualDeleteFilesDs =
spark.sql("SELECT * FROM " + tableName + ".delete_files " + "WHERE
partition.data='a'");
List<Row> actualDeleteFiles = actualDeleteFilesDs.collectAsList();
- Assert.assertEquals(
- "Metadata table should return one delete file", 1,
actualDeleteFiles.size());
+ assertThat(actualDeleteFiles).as("Metadata table should return one delete
file").hasSize(1);
TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
expectedDeleteFiles.get(0),
@@ -202,13 +319,13 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
// Check data files table
List<Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, "a");
- Assert.assertEquals("Should have one data file manifest entry", 1,
expectedDataFiles.size());
+ assertThat(expectedDataFiles).as("Should have one data file manifest
entry").hasSize(1);
Dataset<Row> actualDataFilesDs =
spark.sql("SELECT * FROM " + tableName + ".data_files " + "WHERE
partition.data='a'");
List<Row> actualDataFiles =
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
- Assert.assertEquals("Metadata table should return one data file", 1,
actualDataFiles.size());
+ assertThat(actualDataFiles).as("Metadata table should return one data
file").hasSize(1);
TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualDataFilesDs),
expectedDataFiles.get(0),
@@ -216,37 +333,34 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
List<Row> actualPartitionsWithProjection =
spark.sql("SELECT file_count FROM " + tableName + ".partitions
").collectAsList();
- Assert.assertEquals(
- "Metadata table should return two partitions record",
- 2,
- actualPartitionsWithProjection.size());
- for (int i = 0; i < 2; ++i) {
- Assert.assertEquals(1, actualPartitionsWithProjection.get(i).get(0));
- }
+ assertThat(actualPartitionsWithProjection)
+ .as("Metadata table should return two partitions record")
+ .hasSize(2)
+ .containsExactly(RowFactory.create(1), RowFactory.create(1));
// Check files table
List<Record> expectedFiles =
Stream.concat(expectedDataFiles.stream(), expectedDeleteFiles.stream())
.collect(Collectors.toList());
- Assert.assertEquals("Should have two file manifest entries", 2,
expectedFiles.size());
+ assertThat(expectedFiles).as("Should have two file manifest
entries").hasSize(2);
Dataset<Row> actualFilesDs =
spark.sql(
"SELECT * FROM " + tableName + ".files " + "WHERE
partition.data='a' ORDER BY content");
List<Row> actualFiles =
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
- Assert.assertEquals("Metadata table should return two files", 2,
actualFiles.size());
+ assertThat(actualFiles).as("Metadata table should return two
files").hasSize(2);
TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0),
actualFiles.get(0));
TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1),
actualFiles.get(1));
}
- @Test
+ @TestTemplate
public void testAllFilesUnpartitioned() throws Exception {
sql(
"CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
- + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
- tableName);
+ + "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
+ tableName, formatVersion);
List<SimpleRecord> records =
Lists.newArrayList(
@@ -265,13 +379,13 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Table table = Spark3Util.loadIcebergTable(spark, tableName);
List<ManifestFile> expectedDataManifests =
TestHelpers.dataManifests(table);
- Assert.assertEquals("Should have 1 data manifest", 1,
expectedDataManifests.size());
+ assertThat(expectedDataManifests).as("Should have 1 data
manifest").hasSize(1);
List<ManifestFile> expectedDeleteManifests =
TestHelpers.deleteManifests(table);
- Assert.assertEquals("Should have 1 delete manifest", 1,
expectedDeleteManifests.size());
+ assertThat(expectedDeleteManifests).as("Should have 1 delete
manifest").hasSize(1);
// Clear table to test whether 'all_files' can read past files
List<Object[]> results = sql("DELETE FROM %s", tableName);
- Assert.assertEquals("Table should be cleared", 0, results.size());
+ assertThat(results).as("Table should be cleared").isEmpty();
Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName +
".entries").schema();
Schema filesTableSchema =
@@ -283,8 +397,8 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
List<Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, null);
- Assert.assertEquals("Should be one data file manifest entry", 1,
expectedDataFiles.size());
- Assert.assertEquals("Metadata table should return one data file", 1,
actualDataFiles.size());
+ assertThat(expectedDataFiles).as("Should be one data file manifest
entry").hasSize(1);
+ assertThat(actualDataFiles).as("Metadata table should return one data
file").hasSize(1);
TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualDataFilesDs),
expectedDataFiles.get(0),
@@ -297,9 +411,8 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
List<Record> expectedDeleteFiles =
expectedEntries(
table, FileContent.POSITION_DELETES, entriesTableSchema,
expectedDeleteManifests, null);
- Assert.assertEquals("Should be one delete file manifest entry", 1,
expectedDeleteFiles.size());
- Assert.assertEquals(
- "Metadata table should return one delete file", 1,
actualDeleteFiles.size());
+ assertThat(expectedDeleteFiles).as("Should be one delete file manifest
entry").hasSize(1);
+ assertThat(actualDeleteFiles).as("Metadata table should return one delete
file").hasSize(1);
TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
expectedDeleteFiles.get(0),
@@ -311,12 +424,12 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
List<Row> actualFiles = actualFilesDs.collectAsList();
List<Record> expectedFiles = ListUtils.union(expectedDataFiles,
expectedDeleteFiles);
expectedFiles.sort(Comparator.comparing(r -> ((Integer)
r.get("content"))));
- Assert.assertEquals("Metadata table should return two files", 2,
actualFiles.size());
+ assertThat(actualFiles).as("Metadata table should return two
files").hasSize(2);
TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles,
actualFiles);
}
- @Test
+ @TestTemplate
public void testAllFilesPartitioned() throws Exception {
// Create table and insert data
sql(
@@ -324,8 +437,8 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
+ "USING iceberg "
+ "PARTITIONED BY (data) "
+ "TBLPROPERTIES"
- + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
- tableName);
+ + "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
+ tableName, formatVersion);
List<SimpleRecord> recordsA =
Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
@@ -348,13 +461,13 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Table table = Spark3Util.loadIcebergTable(spark, tableName);
List<ManifestFile> expectedDataManifests =
TestHelpers.dataManifests(table);
- Assert.assertEquals("Should have 2 data manifests", 2,
expectedDataManifests.size());
+ assertThat(expectedDataManifests).as("Should have 2 data
manifests").hasSize(2);
List<ManifestFile> expectedDeleteManifests =
TestHelpers.deleteManifests(table);
- Assert.assertEquals("Should have 1 delete manifest", 1,
expectedDeleteManifests.size());
+ assertThat(expectedDeleteManifests).as("Should have 1 delete
manifest").hasSize(1);
// Clear table to test whether 'all_files' can read past files
List<Object[]> results = sql("DELETE FROM %s", tableName);
- Assert.assertEquals("Table should be cleared", 0, results.size());
+ assertThat(results).as("Table should be cleared").isEmpty();
Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName +
".entries").schema();
Schema filesTableSchema =
@@ -366,8 +479,8 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
List<Row> actualDataFiles =
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
List<Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, "a");
- Assert.assertEquals("Should be one data file manifest entry", 1,
expectedDataFiles.size());
- Assert.assertEquals("Metadata table should return one data file", 1,
actualDataFiles.size());
+ assertThat(expectedDataFiles).as("Should be one data file manifest
entry").hasSize(1);
+ assertThat(actualDataFiles).as("Metadata table should return one data
file").hasSize(1);
TestHelpers.assertEqualsSafe(
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(actualDataFilesDs).schema())
.asStruct(),
@@ -382,8 +495,8 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
List<Record> expectedDeleteFiles =
expectedEntries(
table, FileContent.POSITION_DELETES, entriesTableSchema,
expectedDeleteManifests, "a");
- Assert.assertEquals("Should be one data file manifest entry", 1,
expectedDeleteFiles.size());
- Assert.assertEquals("Metadata table should return one data file", 1,
actualDeleteFiles.size());
+ assertThat(expectedDeleteFiles).as("Should be one data file manifest
entry").hasSize(1);
+ assertThat(actualDeleteFiles).as("Metadata table should return one data
file").hasSize(1);
TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
@@ -401,12 +514,12 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
List<Record> expectedFiles = ListUtils.union(expectedDataFiles,
expectedDeleteFiles);
expectedFiles.sort(Comparator.comparing(r -> ((Integer)
r.get("content"))));
- Assert.assertEquals("Metadata table should return two files", 2,
actualFiles.size());
+ assertThat(actualFiles).as("Metadata table should return two
files").hasSize(2);
TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualDataFilesDs), expectedFiles,
actualFiles);
}
- @Test
+ @TestTemplate
public void testMetadataLogEntries() throws Exception {
// Create table and insert data
sql(
@@ -414,8 +527,8 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
+ "USING iceberg "
+ "PARTITIONED BY (data) "
+ "TBLPROPERTIES "
- + "('format-version'='2')",
- tableName);
+ + "('format-version'='%s')",
+ tableName, formatVersion);
List<SimpleRecord> recordsA =
Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
@@ -463,8 +576,9 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
sql(
"SELECT * FROM %s.metadata_log_entries WHERE latest_snapshot_id =
%s",
tableName, currentSnapshotId);
- Assert.assertEquals(
- "metadataLogEntries table should return 1 row", 1,
metadataLogWithFilters.size());
+ assertThat(metadataLogWithFilters)
+ .as("metadataLogEntries table should return 1 row")
+ .hasSize(1);
assertEquals(
"Result should match the latest snapshot entry",
ImmutableList.of(
@@ -485,15 +599,16 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
metadataFiles.add(tableMetadata.metadataFileLocation());
List<Object[]> metadataLogWithProjection =
sql("SELECT file FROM %s.metadata_log_entries", tableName);
- Assert.assertEquals(
- "metadataLogEntries table should return 3 rows", 3,
metadataLogWithProjection.size());
+ assertThat(metadataLogWithProjection)
+ .as("metadataLogEntries table should return 3 rows")
+ .hasSize(3);
assertEquals(
"metadataLog entry should be of same file",
metadataFiles.stream().map(this::row).collect(Collectors.toList()),
metadataLogWithProjection);
}
- @Test
+ @TestTemplate
public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
// Create table and insert data
sql(
@@ -501,8 +616,8 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
+ "USING iceberg "
+ "PARTITIONED BY (data) "
+ "TBLPROPERTIES"
- + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
- tableName);
+ + "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
+ tableName, formatVersion);
List<SimpleRecord> recordsA =
Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
@@ -527,9 +642,7 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
optional(3, "category", Types.StringType.get())));
spark.createDataFrame(newRecords,
newSparkSchema).coalesce(1).writeTo(tableName).append();
-
table.refresh();
-
Long currentSnapshotId = table.currentSnapshot().snapshotId();
Dataset<Row> actualFilesDs =
@@ -545,7 +658,7 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
List<Record> expectedFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, null);
- Assert.assertEquals("actualFiles size should be 2", 2, actualFiles.size());
+ assertThat(actualFiles).as("actualFiles size should be 2").hasSize(2);
TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0),
actualFiles.get(0));
@@ -553,13 +666,12 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
TestHelpers.assertEqualsSafe(
TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1),
actualFiles.get(1));
- Assert.assertEquals(
- "expectedFiles and actualFiles size should be the same",
- actualFiles.size(),
- expectedFiles.size());
+ assertThat(actualFiles)
+ .as("expectedFiles and actualFiles size should be the same")
+ .hasSameSizeAs(expectedFiles);
}
- @Test
+ @TestTemplate
public void testSnapshotReferencesMetatable() throws Exception {
// Create table and insert data
sql(
@@ -567,8 +679,8 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
+ "USING iceberg "
+ "PARTITIONED BY (data) "
+ "TBLPROPERTIES"
- + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
- tableName);
+ + "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
+ tableName, formatVersion);
List<SimpleRecord> recordsA =
Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
@@ -605,43 +717,64 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
.commit();
// Check refs table
List<Row> references = spark.sql("SELECT * FROM " + tableName +
".refs").collectAsList();
- Assert.assertEquals("Refs table should return 3 rows", 3,
references.size());
+ assertThat(references).as("Refs table should return 3 rows").hasSize(3);
List<Row> branches =
spark.sql("SELECT * FROM " + tableName + ".refs WHERE
type='BRANCH'").collectAsList();
- Assert.assertEquals("Refs table should return 2 branches", 2,
branches.size());
+ assertThat(branches).as("Refs table should return 2 branches").hasSize(2);
List<Row> tags =
spark.sql("SELECT * FROM " + tableName + ".refs WHERE
type='TAG'").collectAsList();
- Assert.assertEquals("Refs table should return 1 tag", 1, tags.size());
+ assertThat(tags).as("Refs table should return 1 tag").hasSize(1);
// Check branch entries in refs table
List<Row> mainBranch =
spark
.sql("SELECT * FROM " + tableName + ".refs WHERE name = 'main' AND
type='BRANCH'")
.collectAsList();
- Assert.assertEquals("main", mainBranch.get(0).getAs("name"));
- Assert.assertEquals("BRANCH", mainBranch.get(0).getAs("type"));
- Assert.assertEquals(currentSnapshotId,
mainBranch.get(0).getAs("snapshot_id"));
+ assertThat(mainBranch)
+ .hasSize(1)
+ .containsExactly(RowFactory.create("main", "BRANCH",
currentSnapshotId, null, null, null));
+ assertThat(mainBranch.get(0).schema().fieldNames())
+ .containsExactly(
+ "name",
+ "type",
+ "snapshot_id",
+ "max_reference_age_in_ms",
+ "min_snapshots_to_keep",
+ "max_snapshot_age_in_ms");
List<Row> testBranch =
spark
.sql("SELECT * FROM " + tableName + ".refs WHERE name =
'testBranch' AND type='BRANCH'")
.collectAsList();
- Assert.assertEquals("testBranch", testBranch.get(0).getAs("name"));
- Assert.assertEquals("BRANCH", testBranch.get(0).getAs("type"));
- Assert.assertEquals(currentSnapshotId,
testBranch.get(0).getAs("snapshot_id"));
- Assert.assertEquals(Long.valueOf(10),
testBranch.get(0).getAs("max_reference_age_in_ms"));
- Assert.assertEquals(Integer.valueOf(20),
testBranch.get(0).getAs("min_snapshots_to_keep"));
- Assert.assertEquals(Long.valueOf(30),
testBranch.get(0).getAs("max_snapshot_age_in_ms"));
+ assertThat(testBranch)
+ .hasSize(1)
+ .containsExactly(
+ RowFactory.create("testBranch", "BRANCH", currentSnapshotId, 10L,
20L, 30L));
+ assertThat(testBranch.get(0).schema().fieldNames())
+ .containsExactly(
+ "name",
+ "type",
+ "snapshot_id",
+ "max_reference_age_in_ms",
+ "min_snapshots_to_keep",
+ "max_snapshot_age_in_ms");
// Check tag entries in refs table
List<Row> testTag =
spark
.sql("SELECT * FROM " + tableName + ".refs WHERE name = 'testTag'
AND type='TAG'")
.collectAsList();
- Assert.assertEquals("testTag", testTag.get(0).getAs("name"));
- Assert.assertEquals("TAG", testTag.get(0).getAs("type"));
- Assert.assertEquals(currentSnapshotId,
testTag.get(0).getAs("snapshot_id"));
- Assert.assertEquals(Long.valueOf(50),
testTag.get(0).getAs("max_reference_age_in_ms"));
+ assertThat(testTag)
+ .hasSize(1)
+ .containsExactly(RowFactory.create("testTag", "TAG",
currentSnapshotId, 50L, null, null));
+ assertThat(testTag.get(0).schema().fieldNames())
+ .containsExactly(
+ "name",
+ "type",
+ "snapshot_id",
+ "max_reference_age_in_ms",
+ "min_snapshots_to_keep",
+ "max_snapshot_age_in_ms");
// Check projection in refs table
List<Row> testTagProjection =
@@ -651,12 +784,12 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
+ tableName
+ ".refs where type='TAG'")
.collectAsList();
- Assert.assertEquals("testTag", testTagProjection.get(0).getAs("name"));
- Assert.assertEquals("TAG", testTagProjection.get(0).getAs("type"));
- Assert.assertEquals(currentSnapshotId,
testTagProjection.get(0).getAs("snapshot_id"));
- Assert.assertEquals(
- Long.valueOf(50),
testTagProjection.get(0).getAs("max_reference_age_in_ms"));
- Assert.assertNull(testTagProjection.get(0).getAs("min_snapshots_to_keep"));
+ assertThat(testTagProjection)
+ .hasSize(1)
+ .containsExactly(RowFactory.create("testTag", "TAG",
currentSnapshotId, 50L, null));
+ assertThat(testTagProjection.get(0).schema().fieldNames())
+ .containsExactly(
+ "name", "type", "snapshot_id", "max_reference_age_in_ms",
"min_snapshots_to_keep");
List<Row> mainBranchProjection =
spark
@@ -665,21 +798,23 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
+ tableName
+ ".refs WHERE name = 'main' AND type = 'BRANCH'")
.collectAsList();
- Assert.assertEquals("main", mainBranchProjection.get(0).getAs("name"));
- Assert.assertEquals("BRANCH", mainBranchProjection.get(0).getAs("type"));
+ assertThat(mainBranchProjection)
+ .hasSize(1)
+ .containsExactly(RowFactory.create("main", "BRANCH"));
+
assertThat(mainBranchProjection.get(0).schema().fieldNames()).containsExactly("name",
"type");
List<Row> testBranchProjection =
spark
.sql(
- "SELECT type, name, max_reference_age_in_ms, snapshot_id FROM "
+ "SELECT name, type, snapshot_id, max_reference_age_in_ms FROM "
+ tableName
+ ".refs WHERE name = 'testBranch' AND type = 'BRANCH'")
.collectAsList();
- Assert.assertEquals("testBranch",
testBranchProjection.get(0).getAs("name"));
- Assert.assertEquals("BRANCH", testBranchProjection.get(0).getAs("type"));
- Assert.assertEquals(currentSnapshotId,
testBranchProjection.get(0).getAs("snapshot_id"));
- Assert.assertEquals(
- Long.valueOf(10),
testBranchProjection.get(0).getAs("max_reference_age_in_ms"));
+ assertThat(testBranchProjection)
+ .hasSize(1)
+ .containsExactly(RowFactory.create("testBranch", "BRANCH",
currentSnapshotId, 10L));
+ assertThat(testBranchProjection.get(0).schema().fieldNames())
+ .containsExactly("name", "type", "snapshot_id",
"max_reference_age_in_ms");
}
/**
@@ -723,4 +858,119 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Record partition = (Record) file.get(4);
return partValue.equals(partition.get(0).toString());
}
+
+ @TestTemplate
+ public void metadataLogEntriesAfterReplacingTable() throws Exception {
+ assumeThat(catalogConfig.get(ICEBERG_CATALOG_TYPE))
+ .as(
+ "need to fix https://github.com/apache/iceberg/issues/11109 before
enabling this for the REST catalog")
+ .isNotEqualTo(ICEBERG_CATALOG_TYPE_REST);
+
+ sql(
+ "CREATE TABLE %s (id bigint, data string) "
+ + "USING iceberg "
+ + "PARTITIONED BY (data) "
+ + "TBLPROPERTIES "
+ + "('format-version'='%s')",
+ tableName, formatVersion);
+
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ TableMetadata tableMetadata = ((HasTableOperations)
table).operations().current();
+ assertThat(tableMetadata.snapshots()).isEmpty();
+ assertThat(tableMetadata.snapshotLog()).isEmpty();
+ assertThat(tableMetadata.currentSnapshot()).isNull();
+
+ Object[] firstEntry =
+ row(
+ DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() *
1000),
+ tableMetadata.metadataFileLocation(),
+ null,
+ null,
+ null);
+
+ assertThat(sql("SELECT * FROM %s.metadata_log_entries",
tableName)).containsExactly(firstEntry);
+
+ sql("INSERT INTO %s (id, data) VALUES (1, 'a')", tableName);
+
+ tableMetadata = ((HasTableOperations) table).operations().refresh();
+ assertThat(tableMetadata.snapshots()).hasSize(1);
+ assertThat(tableMetadata.snapshotLog()).hasSize(1);
+ Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+
+ Object[] secondEntry =
+ row(
+ DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() *
1000),
+ tableMetadata.metadataFileLocation(),
+ currentSnapshot.snapshotId(),
+ currentSnapshot.schemaId(),
+ currentSnapshot.sequenceNumber());
+
+ assertThat(sql("SELECT * FROM %s.metadata_log_entries", tableName))
+ .containsExactly(firstEntry, secondEntry);
+
+ sql("INSERT INTO %s (id, data) VALUES (1, 'a')", tableName);
+
+ tableMetadata = ((HasTableOperations) table).operations().refresh();
+ assertThat(tableMetadata.snapshots()).hasSize(2);
+ assertThat(tableMetadata.snapshotLog()).hasSize(2);
+ currentSnapshot = tableMetadata.currentSnapshot();
+
+ Object[] thirdEntry =
+ row(
+ DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() *
1000),
+ tableMetadata.metadataFileLocation(),
+ currentSnapshot.snapshotId(),
+ currentSnapshot.schemaId(),
+ currentSnapshot.sequenceNumber());
+
+ assertThat(sql("SELECT * FROM %s.metadata_log_entries", tableName))
+ .containsExactly(firstEntry, secondEntry, thirdEntry);
+
+ sql(
+ "CREATE OR REPLACE TABLE %s (id bigint, data string) "
+ + "USING iceberg "
+ + "PARTITIONED BY (data) "
+ + "TBLPROPERTIES "
+ + "('format-version'='%s')",
+ tableName, formatVersion);
+
+ tableMetadata = ((HasTableOperations) table).operations().refresh();
+ assertThat(tableMetadata.snapshots()).hasSize(2);
+ assertThat(tableMetadata.snapshotLog()).hasSize(2);
+
+ // currentSnapshot is null but the metadata_log_entries will refer to the
last snapshot from the
+ // snapshotLog
+ assertThat(tableMetadata.currentSnapshot()).isNull();
+ HistoryEntry historyEntry = tableMetadata.snapshotLog().get(1);
+ Snapshot lastSnapshot = tableMetadata.snapshot(historyEntry.snapshotId());
+
+ Object[] fourthEntry =
+ row(
+ DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() *
1000),
+ tableMetadata.metadataFileLocation(),
+ lastSnapshot.snapshotId(),
+ lastSnapshot.schemaId(),
+ lastSnapshot.sequenceNumber());
+
+ assertThat(sql("SELECT * FROM %s.metadata_log_entries", tableName))
+ .containsExactly(firstEntry, secondEntry, thirdEntry, fourthEntry);
+
+ sql("INSERT INTO %s (id, data) VALUES (1, 'a')", tableName);
+
+ tableMetadata = ((HasTableOperations) table).operations().refresh();
+ assertThat(tableMetadata.snapshots()).hasSize(3);
+ assertThat(tableMetadata.snapshotLog()).hasSize(3);
+ currentSnapshot = tableMetadata.currentSnapshot();
+
+ Object[] fifthEntry =
+ row(
+ DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() *
1000),
+ tableMetadata.metadataFileLocation(),
+ currentSnapshot.snapshotId(),
+ currentSnapshot.schemaId(),
+ currentSnapshot.sequenceNumber());
+
+ assertThat(sql("SELECT * FROM %s.metadata_log_entries", tableName))
+ .containsExactly(firstEntry, secondEntry, thirdEntry, fourthEntry,
fifthEntry);
+ }
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
index 0c319e2bd4..2942413a75 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
@@ -84,6 +84,9 @@ class DVIterator implements CloseableIterator<InternalRow> {
rowValues.add(deleteFile.contentOffset());
} else if (fieldId == MetadataColumns.CONTENT_SIZE_IN_BYTES_COLUMN_ID)
{
rowValues.add(ScanTaskUtil.contentSizeInBytes(deleteFile));
+ } else if (fieldId == MetadataColumns.DELETE_FILE_ROW_FIELD_ID) {
+ // DVs don't track the row that was deleted
+ rowValues.add(null);
}
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java
index 87a49b6444..6cc100097c 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java
@@ -29,7 +29,7 @@ public abstract class CatalogTestBase extends
TestBaseWithCatalog {
// these parameters are broken out to avoid changes that need to modify lots
of test suites
@Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
- public static Object[][] parameters() {
+ protected static Object[][] parameters() {
return new Object[][] {
{
SparkCatalogConfig.HIVE.catalogName(),