This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d14298fe9e Spark 3.4: Test metadata tables with format-version=v3 / 
add ExtensionsTestBase (#12600)
d14298fe9e is described below

commit d14298fe9e826f67e0a2c4d8e279c6b032912726
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Fri Mar 21 13:18:54 2025 +0100

    Spark 3.4: Test metadata tables with format-version=v3 / add 
ExtensionsTestBase (#12600)
    
    this backports #12135 to Spark 3.4 and also includes changes around 
switching from JUnit4 to JUnit5
---
 spark/v3.4/build.gradle                            |   4 +
 .../spark/extensions/ExtensionsTestBase.java       |  70 +++
 .../spark/extensions/TestMergeOnReadUpdate.java    |   3 +-
 .../spark/extensions/TestMetadataTables.java       | 480 ++++++++++++++++-----
 .../apache/iceberg/spark/source/DVIterator.java    |   3 +
 .../org/apache/iceberg/spark/CatalogTestBase.java  |   2 +-
 6 files changed, 444 insertions(+), 118 deletions(-)

diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle
index 8fd500ac64..bf28635f3c 100644
--- a/spark/v3.4/build.gradle
+++ b/spark/v3.4/build.gradle
@@ -192,6 +192,10 @@ 
project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
     antlr libs.antlr.antlr4
   }
 
+  test {
+    useJUnitPlatform()
+  }
+
   generateGrammarSource {
     maxHeapSize = "64m"
     arguments += ['-visitor', '-package', 
'org.apache.spark.sql.catalyst.parser.extensions']
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
new file mode 100644
index 0000000000..578845e3da
--- /dev/null
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.iceberg.spark.TestBase;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.jupiter.api.BeforeAll;
+
+public abstract class ExtensionsTestBase extends CatalogTestBase {
+
+  private static final Random RANDOM = ThreadLocalRandom.current();
+
+  @BeforeAll
+  public static void startMetastoreAndSpark() {
+    TestBase.metastore = new TestHiveMetastore();
+    metastore.start();
+    TestBase.hiveConf = metastore.hiveConf();
+
+    TestBase.spark.close();
+
+    TestBase.spark =
+        SparkSession.builder()
+            .master("local[2]")
+            .config("spark.testing", "true")
+            .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
+            .config("spark.sql.extensions", 
IcebergSparkSessionExtensions.class.getName())
+            .config("spark.hadoop." + METASTOREURIS.varname, 
hiveConf.get(METASTOREURIS.varname))
+            .config("spark.sql.shuffle.partitions", "4")
+            
.config("spark.sql.hive.metastorePartitionPruningFallbackOnException", "true")
+            
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
+            .config(
+                SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), 
String.valueOf(RANDOM.nextBoolean()))
+            .enableHiveSupport()
+            .getOrCreate();
+
+    TestBase.sparkContext = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+    TestBase.catalog =
+        (HiveCatalog)
+            CatalogUtil.loadCatalog(
+                HiveCatalog.class.getName(), "hive", ImmutableMap.of(), 
hiveConf);
+  }
+}
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
index 36a2644729..abfaa0e587 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
@@ -39,7 +39,6 @@ import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.util.ContentFileUtil;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.junit.Test;
-import org.junit.jupiter.api.TestTemplate;
 
 public class TestMergeOnReadUpdate extends TestUpdate {
 
@@ -160,7 +159,7 @@ public class TestMergeOnReadUpdate extends TestUpdate {
         sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
   }
 
-  @TestTemplate
+  @Test
   public void testUpdateWithDVAndHistoricalPositionDeletes() {
     assumeThat(formatVersion).isEqualTo(2);
     createTableWithDeleteGranularity(
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
index 37d590f974..3d2bf75e70 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -18,19 +18,29 @@
  */
 package org.apache.iceberg.spark.extensions;
 
+import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
+import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_REST;
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.commons.collections.ListUtils;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.HistoryEntry;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
@@ -39,8 +49,11 @@ import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkCatalogConfig;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.spark.source.SimpleRecord;
@@ -51,27 +64,74 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.types.StructType;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestMetadataTables extends SparkExtensionsTestBase {
-
-  public TestMetadataTables(String catalogName, String implementation, 
Map<String, String> config) {
-    super(catalogName, implementation, config);
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestMetadataTables extends ExtensionsTestBase {
+  @Parameter(index = 3)
+  private int formatVersion;
+
+  @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, 
formatVersion = {3}")
+  protected static Object[][] parameters() {
+    return new Object[][] {
+      {
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties(),
+        2
+      },
+      {
+        SparkCatalogConfig.HADOOP.catalogName(),
+        SparkCatalogConfig.HADOOP.implementation(),
+        SparkCatalogConfig.HADOOP.properties(),
+        2
+      },
+      {
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties(),
+        2
+      },
+      {
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties(),
+        3
+      },
+      {
+        SparkCatalogConfig.REST.catalogName(),
+        SparkCatalogConfig.REST.implementation(),
+        ImmutableMap.builder()
+            .putAll(SparkCatalogConfig.REST.properties())
+            .put(CatalogProperties.URI, 
restCatalog.properties().get(CatalogProperties.URI))
+            .build(),
+        2
+      },
+      {
+        SparkCatalogConfig.REST.catalogName(),
+        SparkCatalogConfig.REST.implementation(),
+        ImmutableMap.builder()
+            .putAll(SparkCatalogConfig.REST.properties())
+            .put(CatalogProperties.URI, 
restCatalog.properties().get(CatalogProperties.URI))
+            .build(),
+        3
+      }
+    };
   }
 
-  @After
+  @AfterEach
   public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
-  @Test
+  @TestTemplate
   public void testUnpartitionedTable() throws Exception {
     sql(
         "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
-            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
-        tableName);
+            + "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
+        tableName, formatVersion);
 
     List<SimpleRecord> records =
         Lists.newArrayList(
@@ -90,8 +150,8 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     Table table = Spark3Util.loadIcebergTable(spark, tableName);
     List<ManifestFile> expectedDataManifests = 
TestHelpers.dataManifests(table);
     List<ManifestFile> expectedDeleteManifests = 
TestHelpers.deleteManifests(table);
-    Assert.assertEquals("Should have 1 data manifest", 1, 
expectedDataManifests.size());
-    Assert.assertEquals("Should have 1 delete manifest", 1, 
expectedDeleteManifests.size());
+    assertThat(expectedDataManifests).as("Should have 1 data 
manifest").hasSize(1);
+    assertThat(expectedDeleteManifests).as("Should have 1 delete 
manifest").hasSize(1);
 
     Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + 
".entries").schema();
     Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + 
".files").schema();
@@ -99,13 +159,12 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     // check delete files table
     Dataset<Row> actualDeleteFilesDs = spark.sql("SELECT * FROM " + tableName 
+ ".delete_files");
     List<Row> actualDeleteFiles = 
TestHelpers.selectNonDerived(actualDeleteFilesDs).collectAsList();
-    Assert.assertEquals(
-        "Metadata table should return one delete file", 1, 
actualDeleteFiles.size());
+    assertThat(actualDeleteFiles).as("Metadata table should return one delete 
file").hasSize(1);
 
     List<Record> expectedDeleteFiles =
         expectedEntries(
             table, FileContent.POSITION_DELETES, entriesTableSchema, 
expectedDeleteManifests, null);
-    Assert.assertEquals("Should be one delete file manifest entry", 1, 
expectedDeleteFiles.size());
+    assertThat(expectedDeleteFiles).as("Should be one delete file manifest 
entry").hasSize(1);
     TestHelpers.assertEqualsSafe(
         TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
         expectedDeleteFiles.get(0),
@@ -114,11 +173,11 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     // check data files table
     Dataset<Row> actualDataFilesDs = spark.sql("SELECT * FROM " + tableName + 
".data_files");
     List<Row> actualDataFiles = 
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
-    Assert.assertEquals("Metadata table should return one data file", 1, 
actualDataFiles.size());
+    assertThat(actualDataFiles).as("Metadata table should return one data 
file").hasSize(1);
 
     List<Record> expectedDataFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, null);
-    Assert.assertEquals("Should be one data file manifest entry", 1, 
expectedDataFiles.size());
+    assertThat(expectedDataFiles).as("Should be one data file manifest 
entry").hasSize(1);
     TestHelpers.assertEqualsSafe(
         TestHelpers.nonDerivedSchema(actualDataFilesDs),
         expectedDataFiles.get(0),
@@ -129,27 +188,87 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
         spark.sql("SELECT * FROM " + tableName + ".files ORDER BY content");
     List<Row> actualFiles = 
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
 
-    Assert.assertEquals("Metadata table should return two files", 2, 
actualFiles.size());
+    assertThat(actualFiles).as("Metadata table should return two 
files").hasSize(2);
 
     List<Record> expectedFiles =
         Stream.concat(expectedDataFiles.stream(), expectedDeleteFiles.stream())
             .collect(Collectors.toList());
-    Assert.assertEquals("Should have two files manifest entries", 2, 
expectedFiles.size());
+    assertThat(expectedFiles).as("Should have two files manifest 
entries").hasSize(2);
     TestHelpers.assertEqualsSafe(
         TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0), 
actualFiles.get(0));
     TestHelpers.assertEqualsSafe(
         TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1), 
actualFiles.get(1));
   }
 
-  @Test
+  @TestTemplate
+  public void testPositionDeletesTable() throws Exception {
+    sql(
+        "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+            + "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
+        tableName, formatVersion);
+
+    List<SimpleRecord> records =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "c"),
+            new SimpleRecord(4, "d"));
+    spark
+        .createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+    sql("DELETE FROM %s WHERE id=1 OR id=3", tableName);
+
+    // check delete files table
+    assertThat(sql("SELECT * FROM %s.delete_files", tableName)).hasSize(1);
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    DataFile dataFile = Iterables.getOnlyElement(TestHelpers.dataFiles(table));
+    DeleteFile deleteFile = 
Iterables.getOnlyElement(TestHelpers.deleteFiles(table));
+
+    List<Object[]> expectedRows;
+    if (formatVersion >= 3) {
+      expectedRows =
+          ImmutableList.of(
+              row(
+                  dataFile.location(),
+                  0L,
+                  null,
+                  dataFile.specId(),
+                  deleteFile.location(),
+                  deleteFile.contentOffset(),
+                  deleteFile.contentSizeInBytes()),
+              row(
+                  dataFile.location(),
+                  2L,
+                  null,
+                  dataFile.specId(),
+                  deleteFile.location(),
+                  deleteFile.contentOffset(),
+                  deleteFile.contentSizeInBytes()));
+    } else {
+      expectedRows =
+          ImmutableList.of(
+              row(dataFile.location(), 0L, null, dataFile.specId(), 
deleteFile.location()),
+              row(dataFile.location(), 2L, null, dataFile.specId(), 
deleteFile.location()));
+    }
+
+    // check position_deletes table
+    assertThat(sql("SELECT * FROM %s.position_deletes", tableName))
+        .hasSize(2)
+        .containsExactlyElementsOf(expectedRows);
+  }
+
+  @TestTemplate
   public void testPartitionedTable() throws Exception {
     sql(
         "CREATE TABLE %s (id bigint, data string) "
             + "USING iceberg "
             + "PARTITIONED BY (data) "
             + "TBLPROPERTIES"
-            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
-        tableName);
+            + "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
+        tableName, formatVersion);
 
     List<SimpleRecord> recordsA =
         Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
@@ -175,8 +294,8 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
 
     List<ManifestFile> expectedDataManifests = 
TestHelpers.dataManifests(table);
     List<ManifestFile> expectedDeleteManifests = 
TestHelpers.deleteManifests(table);
-    Assert.assertEquals("Should have 2 data manifests", 2, 
expectedDataManifests.size());
-    Assert.assertEquals("Should have 2 delete manifests", 2, 
expectedDeleteManifests.size());
+    assertThat(expectedDataManifests).as("Should have 2 data 
manifest").hasSize(2);
+    assertThat(expectedDeleteManifests).as("Should have 2 delete 
manifest").hasSize(2);
 
     Schema filesTableSchema =
         Spark3Util.loadIcebergTable(spark, tableName + 
".delete_files").schema();
@@ -185,15 +304,13 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     List<Record> expectedDeleteFiles =
         expectedEntries(
             table, FileContent.POSITION_DELETES, entriesTableSchema, 
expectedDeleteManifests, "a");
-    Assert.assertEquals(
-        "Should have one delete file manifest entry", 1, 
expectedDeleteFiles.size());
+    assertThat(expectedDeleteFiles).as("Should have one delete file manifest 
entry").hasSize(1);
 
     Dataset<Row> actualDeleteFilesDs =
         spark.sql("SELECT * FROM " + tableName + ".delete_files " + "WHERE 
partition.data='a'");
     List<Row> actualDeleteFiles = actualDeleteFilesDs.collectAsList();
 
-    Assert.assertEquals(
-        "Metadata table should return one delete file", 1, 
actualDeleteFiles.size());
+    assertThat(actualDeleteFiles).as("Metadata table should return one delete 
file").hasSize(1);
     TestHelpers.assertEqualsSafe(
         TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
         expectedDeleteFiles.get(0),
@@ -202,13 +319,13 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     // Check data files table
     List<Record> expectedDataFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, "a");
-    Assert.assertEquals("Should have one data file manifest entry", 1, 
expectedDataFiles.size());
+    assertThat(expectedDataFiles).as("Should have one data file manifest 
entry").hasSize(1);
 
     Dataset<Row> actualDataFilesDs =
         spark.sql("SELECT * FROM " + tableName + ".data_files " + "WHERE 
partition.data='a'");
 
     List<Row> actualDataFiles = 
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
-    Assert.assertEquals("Metadata table should return one data file", 1, 
actualDataFiles.size());
+    assertThat(actualDataFiles).as("Metadata table should return one data 
file").hasSize(1);
     TestHelpers.assertEqualsSafe(
         TestHelpers.nonDerivedSchema(actualDataFilesDs),
         expectedDataFiles.get(0),
@@ -216,37 +333,34 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
 
     List<Row> actualPartitionsWithProjection =
         spark.sql("SELECT file_count FROM " + tableName + ".partitions 
").collectAsList();
-    Assert.assertEquals(
-        "Metadata table should return two partitions record",
-        2,
-        actualPartitionsWithProjection.size());
-    for (int i = 0; i < 2; ++i) {
-      Assert.assertEquals(1, actualPartitionsWithProjection.get(i).get(0));
-    }
+    assertThat(actualPartitionsWithProjection)
+        .as("Metadata table should return two partitions record")
+        .hasSize(2)
+        .containsExactly(RowFactory.create(1), RowFactory.create(1));
 
     // Check files table
     List<Record> expectedFiles =
         Stream.concat(expectedDataFiles.stream(), expectedDeleteFiles.stream())
             .collect(Collectors.toList());
-    Assert.assertEquals("Should have two file manifest entries", 2, 
expectedFiles.size());
+    assertThat(expectedFiles).as("Should have two file manifest 
entries").hasSize(2);
 
     Dataset<Row> actualFilesDs =
         spark.sql(
             "SELECT * FROM " + tableName + ".files " + "WHERE 
partition.data='a' ORDER BY content");
     List<Row> actualFiles = 
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
-    Assert.assertEquals("Metadata table should return two files", 2, 
actualFiles.size());
+    assertThat(actualFiles).as("Metadata table should return two 
files").hasSize(2);
     TestHelpers.assertEqualsSafe(
         TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0), 
actualFiles.get(0));
     TestHelpers.assertEqualsSafe(
         TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1), 
actualFiles.get(1));
   }
 
-  @Test
+  @TestTemplate
   public void testAllFilesUnpartitioned() throws Exception {
     sql(
         "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
-            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
-        tableName);
+            + "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
+        tableName, formatVersion);
 
     List<SimpleRecord> records =
         Lists.newArrayList(
@@ -265,13 +379,13 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
 
     Table table = Spark3Util.loadIcebergTable(spark, tableName);
     List<ManifestFile> expectedDataManifests = 
TestHelpers.dataManifests(table);
-    Assert.assertEquals("Should have 1 data manifest", 1, 
expectedDataManifests.size());
+    assertThat(expectedDataManifests).as("Should have 1 data 
manifest").hasSize(1);
     List<ManifestFile> expectedDeleteManifests = 
TestHelpers.deleteManifests(table);
-    Assert.assertEquals("Should have 1 delete manifest", 1, 
expectedDeleteManifests.size());
+    assertThat(expectedDeleteManifests).as("Should have 1 delete 
manifest").hasSize(1);
 
     // Clear table to test whether 'all_files' can read past files
     List<Object[]> results = sql("DELETE FROM %s", tableName);
-    Assert.assertEquals("Table should be cleared", 0, results.size());
+    assertThat(results).as("Table should be cleared").isEmpty();
 
     Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + 
".entries").schema();
     Schema filesTableSchema =
@@ -283,8 +397,8 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
 
     List<Record> expectedDataFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, null);
-    Assert.assertEquals("Should be one data file manifest entry", 1, 
expectedDataFiles.size());
-    Assert.assertEquals("Metadata table should return one data file", 1, 
actualDataFiles.size());
+    assertThat(expectedDataFiles).as("Should be one data file manifest 
entry").hasSize(1);
+    assertThat(actualDataFiles).as("Metadata table should return one data 
file").hasSize(1);
     TestHelpers.assertEqualsSafe(
         TestHelpers.nonDerivedSchema(actualDataFilesDs),
         expectedDataFiles.get(0),
@@ -297,9 +411,8 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     List<Record> expectedDeleteFiles =
         expectedEntries(
             table, FileContent.POSITION_DELETES, entriesTableSchema, 
expectedDeleteManifests, null);
-    Assert.assertEquals("Should be one delete file manifest entry", 1, 
expectedDeleteFiles.size());
-    Assert.assertEquals(
-        "Metadata table should return one delete file", 1, 
actualDeleteFiles.size());
+    assertThat(expectedDeleteFiles).as("Should be one delete file manifest 
entry").hasSize(1);
+    assertThat(actualDeleteFiles).as("Metadata table should return one delete 
file").hasSize(1);
     TestHelpers.assertEqualsSafe(
         TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
         expectedDeleteFiles.get(0),
@@ -311,12 +424,12 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     List<Row> actualFiles = actualFilesDs.collectAsList();
     List<Record> expectedFiles = ListUtils.union(expectedDataFiles, 
expectedDeleteFiles);
     expectedFiles.sort(Comparator.comparing(r -> ((Integer) 
r.get("content"))));
-    Assert.assertEquals("Metadata table should return two files", 2, 
actualFiles.size());
+    assertThat(actualFiles).as("Metadata table should return two 
files").hasSize(2);
     TestHelpers.assertEqualsSafe(
         TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles, 
actualFiles);
   }
 
-  @Test
+  @TestTemplate
   public void testAllFilesPartitioned() throws Exception {
     // Create table and insert data
     sql(
@@ -324,8 +437,8 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
             + "USING iceberg "
             + "PARTITIONED BY (data) "
             + "TBLPROPERTIES"
-            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
-        tableName);
+            + "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
+        tableName, formatVersion);
 
     List<SimpleRecord> recordsA =
         Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
@@ -348,13 +461,13 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
 
     Table table = Spark3Util.loadIcebergTable(spark, tableName);
     List<ManifestFile> expectedDataManifests = 
TestHelpers.dataManifests(table);
-    Assert.assertEquals("Should have 2 data manifests", 2, 
expectedDataManifests.size());
+    assertThat(expectedDataManifests).as("Should have 2 data 
manifests").hasSize(2);
     List<ManifestFile> expectedDeleteManifests = 
TestHelpers.deleteManifests(table);
-    Assert.assertEquals("Should have 1 delete manifest", 1, 
expectedDeleteManifests.size());
+    assertThat(expectedDeleteManifests).as("Should have 1 delete 
manifest").hasSize(1);
 
     // Clear table to test whether 'all_files' can read past files
     List<Object[]> results = sql("DELETE FROM %s", tableName);
-    Assert.assertEquals("Table should be cleared", 0, results.size());
+    assertThat(results).as("Table should be cleared").isEmpty();
 
     Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + 
".entries").schema();
     Schema filesTableSchema =
@@ -366,8 +479,8 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     List<Row> actualDataFiles = 
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
     List<Record> expectedDataFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, "a");
-    Assert.assertEquals("Should be one data file manifest entry", 1, 
expectedDataFiles.size());
-    Assert.assertEquals("Metadata table should return one data file", 1, 
actualDataFiles.size());
+    assertThat(expectedDataFiles).as("Should be one data file manifest 
entry").hasSize(1);
+    assertThat(actualDataFiles).as("Metadata table should return one data 
file").hasSize(1);
     TestHelpers.assertEqualsSafe(
         
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(actualDataFilesDs).schema())
             .asStruct(),
@@ -382,8 +495,8 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     List<Record> expectedDeleteFiles =
         expectedEntries(
             table, FileContent.POSITION_DELETES, entriesTableSchema, 
expectedDeleteManifests, "a");
-    Assert.assertEquals("Should be one data file manifest entry", 1, 
expectedDeleteFiles.size());
-    Assert.assertEquals("Metadata table should return one data file", 1, 
actualDeleteFiles.size());
+    assertThat(expectedDeleteFiles).as("Should be one data file manifest 
entry").hasSize(1);
+    assertThat(actualDeleteFiles).as("Metadata table should return one data 
file").hasSize(1);
 
     TestHelpers.assertEqualsSafe(
         TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
@@ -401,12 +514,12 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
 
     List<Record> expectedFiles = ListUtils.union(expectedDataFiles, 
expectedDeleteFiles);
     expectedFiles.sort(Comparator.comparing(r -> ((Integer) 
r.get("content"))));
-    Assert.assertEquals("Metadata table should return two files", 2, 
actualFiles.size());
+    assertThat(actualFiles).as("Metadata table should return two 
files").hasSize(2);
     TestHelpers.assertEqualsSafe(
         TestHelpers.nonDerivedSchema(actualDataFilesDs), expectedFiles, 
actualFiles);
   }
 
-  @Test
+  @TestTemplate
   public void testMetadataLogEntries() throws Exception {
     // Create table and insert data
     sql(
@@ -414,8 +527,8 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
             + "USING iceberg "
             + "PARTITIONED BY (data) "
             + "TBLPROPERTIES "
-            + "('format-version'='2')",
-        tableName);
+            + "('format-version'='%s')",
+        tableName, formatVersion);
 
     List<SimpleRecord> recordsA =
         Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
@@ -463,8 +576,9 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
         sql(
             "SELECT * FROM %s.metadata_log_entries WHERE latest_snapshot_id = 
%s",
             tableName, currentSnapshotId);
-    Assert.assertEquals(
-        "metadataLogEntries table should return 1 row", 1, 
metadataLogWithFilters.size());
+    assertThat(metadataLogWithFilters)
+        .as("metadataLogEntries table should return 1 row")
+        .hasSize(1);
     assertEquals(
         "Result should match the latest snapshot entry",
         ImmutableList.of(
@@ -485,15 +599,16 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     metadataFiles.add(tableMetadata.metadataFileLocation());
     List<Object[]> metadataLogWithProjection =
         sql("SELECT file FROM %s.metadata_log_entries", tableName);
-    Assert.assertEquals(
-        "metadataLogEntries table should return 3 rows", 3, 
metadataLogWithProjection.size());
+    assertThat(metadataLogWithProjection)
+        .as("metadataLogEntries table should return 3 rows")
+        .hasSize(3);
     assertEquals(
         "metadataLog entry should be of same file",
         metadataFiles.stream().map(this::row).collect(Collectors.toList()),
         metadataLogWithProjection);
   }
 
-  @Test
+  @TestTemplate
   public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
     // Create table and insert data
     sql(
@@ -501,8 +616,8 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
             + "USING iceberg "
             + "PARTITIONED BY (data) "
             + "TBLPROPERTIES"
-            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
-        tableName);
+            + "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
+        tableName, formatVersion);
 
     List<SimpleRecord> recordsA =
         Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
@@ -527,9 +642,7 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
                 optional(3, "category", Types.StringType.get())));
 
     spark.createDataFrame(newRecords, 
newSparkSchema).coalesce(1).writeTo(tableName).append();
-
     table.refresh();
-
     Long currentSnapshotId = table.currentSnapshot().snapshotId();
 
     Dataset<Row> actualFilesDs =
@@ -545,7 +658,7 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     List<Record> expectedFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, null);
 
-    Assert.assertEquals("actualFiles size should be 2", 2, actualFiles.size());
+    assertThat(actualFiles).as("actualFiles size should be 2").hasSize(2);
 
     TestHelpers.assertEqualsSafe(
         TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0), 
actualFiles.get(0));
@@ -553,13 +666,12 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     TestHelpers.assertEqualsSafe(
         TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1), 
actualFiles.get(1));
 
-    Assert.assertEquals(
-        "expectedFiles and actualFiles size should be the same",
-        actualFiles.size(),
-        expectedFiles.size());
+    assertThat(actualFiles)
+        .as("expectedFiles and actualFiles size should be the same")
+        .hasSameSizeAs(expectedFiles);
   }
 
-  @Test
+  @TestTemplate
   public void testSnapshotReferencesMetatable() throws Exception {
     // Create table and insert data
     sql(
@@ -567,8 +679,8 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
             + "USING iceberg "
             + "PARTITIONED BY (data) "
             + "TBLPROPERTIES"
-            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
-        tableName);
+            + "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
+        tableName, formatVersion);
 
     List<SimpleRecord> recordsA =
         Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
@@ -605,43 +717,64 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
         .commit();
     // Check refs table
     List<Row> references = spark.sql("SELECT * FROM " + tableName + 
".refs").collectAsList();
-    Assert.assertEquals("Refs table should return 3 rows", 3, 
references.size());
+    assertThat(references).as("Refs table should return 3 rows").hasSize(3);
     List<Row> branches =
         spark.sql("SELECT * FROM " + tableName + ".refs WHERE 
type='BRANCH'").collectAsList();
-    Assert.assertEquals("Refs table should return 2 branches", 2, 
branches.size());
+    assertThat(branches).as("Refs table should return 2 branches").hasSize(2);
     List<Row> tags =
         spark.sql("SELECT * FROM " + tableName + ".refs WHERE 
type='TAG'").collectAsList();
-    Assert.assertEquals("Refs table should return 1 tag", 1, tags.size());
+    assertThat(tags).as("Refs table should return 1 tag").hasSize(1);
 
     // Check branch entries in refs table
     List<Row> mainBranch =
         spark
             .sql("SELECT * FROM " + tableName + ".refs WHERE name = 'main' AND 
type='BRANCH'")
             .collectAsList();
-    Assert.assertEquals("main", mainBranch.get(0).getAs("name"));
-    Assert.assertEquals("BRANCH", mainBranch.get(0).getAs("type"));
-    Assert.assertEquals(currentSnapshotId, 
mainBranch.get(0).getAs("snapshot_id"));
+    assertThat(mainBranch)
+        .hasSize(1)
+        .containsExactly(RowFactory.create("main", "BRANCH", 
currentSnapshotId, null, null, null));
+    assertThat(mainBranch.get(0).schema().fieldNames())
+        .containsExactly(
+            "name",
+            "type",
+            "snapshot_id",
+            "max_reference_age_in_ms",
+            "min_snapshots_to_keep",
+            "max_snapshot_age_in_ms");
 
     List<Row> testBranch =
         spark
             .sql("SELECT * FROM " + tableName + ".refs WHERE name = 
'testBranch' AND type='BRANCH'")
             .collectAsList();
-    Assert.assertEquals("testBranch", testBranch.get(0).getAs("name"));
-    Assert.assertEquals("BRANCH", testBranch.get(0).getAs("type"));
-    Assert.assertEquals(currentSnapshotId, 
testBranch.get(0).getAs("snapshot_id"));
-    Assert.assertEquals(Long.valueOf(10), 
testBranch.get(0).getAs("max_reference_age_in_ms"));
-    Assert.assertEquals(Integer.valueOf(20), 
testBranch.get(0).getAs("min_snapshots_to_keep"));
-    Assert.assertEquals(Long.valueOf(30), 
testBranch.get(0).getAs("max_snapshot_age_in_ms"));
+    assertThat(testBranch)
+        .hasSize(1)
+        .containsExactly(
+            RowFactory.create("testBranch", "BRANCH", currentSnapshotId, 10L, 
20L, 30L));
+    assertThat(testBranch.get(0).schema().fieldNames())
+        .containsExactly(
+            "name",
+            "type",
+            "snapshot_id",
+            "max_reference_age_in_ms",
+            "min_snapshots_to_keep",
+            "max_snapshot_age_in_ms");
 
     // Check tag entries in refs table
     List<Row> testTag =
         spark
             .sql("SELECT * FROM " + tableName + ".refs WHERE name = 'testTag' 
AND type='TAG'")
             .collectAsList();
-    Assert.assertEquals("testTag", testTag.get(0).getAs("name"));
-    Assert.assertEquals("TAG", testTag.get(0).getAs("type"));
-    Assert.assertEquals(currentSnapshotId, 
testTag.get(0).getAs("snapshot_id"));
-    Assert.assertEquals(Long.valueOf(50), 
testTag.get(0).getAs("max_reference_age_in_ms"));
+    assertThat(testTag)
+        .hasSize(1)
+        .containsExactly(RowFactory.create("testTag", "TAG", 
currentSnapshotId, 50L, null, null));
+    assertThat(testTag.get(0).schema().fieldNames())
+        .containsExactly(
+            "name",
+            "type",
+            "snapshot_id",
+            "max_reference_age_in_ms",
+            "min_snapshots_to_keep",
+            "max_snapshot_age_in_ms");
 
     // Check projection in refs table
     List<Row> testTagProjection =
@@ -651,12 +784,12 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
                     + tableName
                     + ".refs where type='TAG'")
             .collectAsList();
-    Assert.assertEquals("testTag", testTagProjection.get(0).getAs("name"));
-    Assert.assertEquals("TAG", testTagProjection.get(0).getAs("type"));
-    Assert.assertEquals(currentSnapshotId, 
testTagProjection.get(0).getAs("snapshot_id"));
-    Assert.assertEquals(
-        Long.valueOf(50), 
testTagProjection.get(0).getAs("max_reference_age_in_ms"));
-    Assert.assertNull(testTagProjection.get(0).getAs("min_snapshots_to_keep"));
+    assertThat(testTagProjection)
+        .hasSize(1)
+        .containsExactly(RowFactory.create("testTag", "TAG", 
currentSnapshotId, 50L, null));
+    assertThat(testTagProjection.get(0).schema().fieldNames())
+        .containsExactly(
+            "name", "type", "snapshot_id", "max_reference_age_in_ms", 
"min_snapshots_to_keep");
 
     List<Row> mainBranchProjection =
         spark
@@ -665,21 +798,23 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
                     + tableName
                     + ".refs WHERE name = 'main' AND type = 'BRANCH'")
             .collectAsList();
-    Assert.assertEquals("main", mainBranchProjection.get(0).getAs("name"));
-    Assert.assertEquals("BRANCH", mainBranchProjection.get(0).getAs("type"));
+    assertThat(mainBranchProjection)
+        .hasSize(1)
+        .containsExactly(RowFactory.create("main", "BRANCH"));
+    
assertThat(mainBranchProjection.get(0).schema().fieldNames()).containsExactly("name",
 "type");
 
     List<Row> testBranchProjection =
         spark
             .sql(
-                "SELECT type, name, max_reference_age_in_ms, snapshot_id FROM "
+                "SELECT name, type, snapshot_id, max_reference_age_in_ms FROM "
                     + tableName
                     + ".refs WHERE name = 'testBranch' AND type = 'BRANCH'")
             .collectAsList();
-    Assert.assertEquals("testBranch", 
testBranchProjection.get(0).getAs("name"));
-    Assert.assertEquals("BRANCH", testBranchProjection.get(0).getAs("type"));
-    Assert.assertEquals(currentSnapshotId, 
testBranchProjection.get(0).getAs("snapshot_id"));
-    Assert.assertEquals(
-        Long.valueOf(10), 
testBranchProjection.get(0).getAs("max_reference_age_in_ms"));
+    assertThat(testBranchProjection)
+        .hasSize(1)
+        .containsExactly(RowFactory.create("testBranch", "BRANCH", 
currentSnapshotId, 10L));
+    assertThat(testBranchProjection.get(0).schema().fieldNames())
+        .containsExactly("name", "type", "snapshot_id", 
"max_reference_age_in_ms");
   }
 
   /**
@@ -723,4 +858,119 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     Record partition = (Record) file.get(4);
     return partValue.equals(partition.get(0).toString());
   }
+
+  @TestTemplate
+  public void metadataLogEntriesAfterReplacingTable() throws Exception {
+    assumeThat(catalogConfig.get(ICEBERG_CATALOG_TYPE))
+        .as(
+            "need to fix https://github.com/apache/iceberg/issues/11109 before 
enabling this for the REST catalog")
+        .isNotEqualTo(ICEBERG_CATALOG_TYPE_REST);
+
+    sql(
+        "CREATE TABLE %s (id bigint, data string) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES "
+            + "('format-version'='%s')",
+        tableName, formatVersion);
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    TableMetadata tableMetadata = ((HasTableOperations) 
table).operations().current();
+    assertThat(tableMetadata.snapshots()).isEmpty();
+    assertThat(tableMetadata.snapshotLog()).isEmpty();
+    assertThat(tableMetadata.currentSnapshot()).isNull();
+
+    Object[] firstEntry =
+        row(
+            DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() * 
1000),
+            tableMetadata.metadataFileLocation(),
+            null,
+            null,
+            null);
+
+    assertThat(sql("SELECT * FROM %s.metadata_log_entries", 
tableName)).containsExactly(firstEntry);
+
+    sql("INSERT INTO %s (id, data) VALUES (1, 'a')", tableName);
+
+    tableMetadata = ((HasTableOperations) table).operations().refresh();
+    assertThat(tableMetadata.snapshots()).hasSize(1);
+    assertThat(tableMetadata.snapshotLog()).hasSize(1);
+    Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+
+    Object[] secondEntry =
+        row(
+            DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() * 
1000),
+            tableMetadata.metadataFileLocation(),
+            currentSnapshot.snapshotId(),
+            currentSnapshot.schemaId(),
+            currentSnapshot.sequenceNumber());
+
+    assertThat(sql("SELECT * FROM %s.metadata_log_entries", tableName))
+        .containsExactly(firstEntry, secondEntry);
+
+    sql("INSERT INTO %s (id, data) VALUES (1, 'a')", tableName);
+
+    tableMetadata = ((HasTableOperations) table).operations().refresh();
+    assertThat(tableMetadata.snapshots()).hasSize(2);
+    assertThat(tableMetadata.snapshotLog()).hasSize(2);
+    currentSnapshot = tableMetadata.currentSnapshot();
+
+    Object[] thirdEntry =
+        row(
+            DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() * 
1000),
+            tableMetadata.metadataFileLocation(),
+            currentSnapshot.snapshotId(),
+            currentSnapshot.schemaId(),
+            currentSnapshot.sequenceNumber());
+
+    assertThat(sql("SELECT * FROM %s.metadata_log_entries", tableName))
+        .containsExactly(firstEntry, secondEntry, thirdEntry);
+
+    sql(
+        "CREATE OR REPLACE TABLE %s (id bigint, data string) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES "
+            + "('format-version'='%s')",
+        tableName, formatVersion);
+
+    tableMetadata = ((HasTableOperations) table).operations().refresh();
+    assertThat(tableMetadata.snapshots()).hasSize(2);
+    assertThat(tableMetadata.snapshotLog()).hasSize(2);
+
+    // currentSnapshot is null but the metadata_log_entries will refer to the 
last snapshot from the
+    // snapshotLog
+    assertThat(tableMetadata.currentSnapshot()).isNull();
+    HistoryEntry historyEntry = tableMetadata.snapshotLog().get(1);
+    Snapshot lastSnapshot = tableMetadata.snapshot(historyEntry.snapshotId());
+
+    Object[] fourthEntry =
+        row(
+            DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() * 
1000),
+            tableMetadata.metadataFileLocation(),
+            lastSnapshot.snapshotId(),
+            lastSnapshot.schemaId(),
+            lastSnapshot.sequenceNumber());
+
+    assertThat(sql("SELECT * FROM %s.metadata_log_entries", tableName))
+        .containsExactly(firstEntry, secondEntry, thirdEntry, fourthEntry);
+
+    sql("INSERT INTO %s (id, data) VALUES (1, 'a')", tableName);
+
+    tableMetadata = ((HasTableOperations) table).operations().refresh();
+    assertThat(tableMetadata.snapshots()).hasSize(3);
+    assertThat(tableMetadata.snapshotLog()).hasSize(3);
+    currentSnapshot = tableMetadata.currentSnapshot();
+
+    Object[] fifthEntry =
+        row(
+            DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() * 
1000),
+            tableMetadata.metadataFileLocation(),
+            currentSnapshot.snapshotId(),
+            currentSnapshot.schemaId(),
+            currentSnapshot.sequenceNumber());
+
+    assertThat(sql("SELECT * FROM %s.metadata_log_entries", tableName))
+        .containsExactly(firstEntry, secondEntry, thirdEntry, fourthEntry, 
fifthEntry);
+  }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
index 0c319e2bd4..2942413a75 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
@@ -84,6 +84,9 @@ class DVIterator implements CloseableIterator<InternalRow> {
           rowValues.add(deleteFile.contentOffset());
         } else if (fieldId == MetadataColumns.CONTENT_SIZE_IN_BYTES_COLUMN_ID) 
{
           rowValues.add(ScanTaskUtil.contentSizeInBytes(deleteFile));
+        } else if (fieldId == MetadataColumns.DELETE_FILE_ROW_FIELD_ID) {
+          // DVs don't track the row that was deleted
+          rowValues.add(null);
         }
       }
 
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java
index 87a49b6444..6cc100097c 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java
@@ -29,7 +29,7 @@ public abstract class CatalogTestBase extends 
TestBaseWithCatalog {
 
   // these parameters are broken out to avoid changes that need to modify lots 
of test suites
   @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
-  public static Object[][] parameters() {
+  protected static Object[][] parameters() {
     return new Object[][] {
       {
         SparkCatalogConfig.HIVE.catalogName(),


Reply via email to