This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d2046a7d9e Core, Spark: Preserve DV-specific fields for deletion 
vectors (#14351)
d2046a7d9e is described below

commit d2046a7d9e7a94745e7cad610817df6b761dd085
Author: adawrapub <[email protected]>
AuthorDate: Tue Nov 11 23:46:26 2025 -0800

    Core, Spark: Preserve DV-specific fields for deletion vectors (#14351)
    
    DV files (Puffin format) store the referenced data file path in two 
separate locations:
    Manifest Metadata: DeleteFile.referencedDataFile() field
    Puffin Blob Metadata: "referenced-data-file" property inside the blob
    The original implementation only updated the manifest metadata. The Puffin 
blob metadata still contained the old path, causing the DV reader to fail when 
applying deletes at the new location.
    
    I have implemented a two-pronged update strategy. Please let me know if 
there is a better way
    
    1. Manifest Metadata Update
    Added ContentFileUtil.replaceReferencedDataFile() utility method
    Created RewriteTablePathUtil.newPositionDeleteEntry() helper method
    Updates the referencedDataFile field in manifest entries
    2. Puffin Content Update
    Implemented RewriteTablePathUtil.rewriteDVFile() method
    Reads Puffin files, updates blob metadata properties, writes new files
    Preserves the bitmap data while updating path references
---
 .../main/java/org/apache/iceberg/FileMetadata.java |   5 +
 .../org/apache/iceberg/RewriteTablePathUtil.java   | 129 +++++++++++++--
 .../spark/actions/TestRewriteTablePathsAction.java | 175 +++++++++++++--------
 .../spark/actions/TestRewriteTablePathsAction.java | 175 +++++++++++++--------
 .../spark/actions/TestRewriteTablePathsAction.java | 175 +++++++++++++--------
 5 files changed, 456 insertions(+), 203 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/FileMetadata.java 
b/core/src/main/java/org/apache/iceberg/FileMetadata.java
index 4a0668976a..a5266101c2 100644
--- a/core/src/main/java/org/apache/iceberg/FileMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/FileMetadata.java
@@ -109,6 +109,11 @@ public class FileMetadata {
       this.keyMetadata =
           toCopy.keyMetadata() == null ? null : 
ByteBuffers.copy(toCopy.keyMetadata());
       this.sortOrderId = toCopy.sortOrderId();
+      this.splitOffsets = toCopy.splitOffsets();
+      // Preserve DV-specific fields for deletion vectors
+      this.referencedDataFile = toCopy.referencedDataFile();
+      this.contentOffset = toCopy.contentOffset();
+      this.contentSizeInBytes = toCopy.contentSizeInBytes();
       return this;
     }
 
diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java 
b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
index ee7679f5e9..6f58b29315 100644
--- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
+++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
 import java.io.IOException;
 import java.io.Serializable;
 import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +41,11 @@ import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinCompressionCodec;
+import org.apache.iceberg.puffin.PuffinReader;
+import org.apache.iceberg.puffin.PuffinWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -446,16 +452,8 @@ public class RewriteTablePathUtil {
 
     switch (file.content()) {
       case POSITION_DELETES:
-        String targetDeleteFilePath = newPath(file.location(), sourcePrefix, 
targetPrefix);
-        Metrics metricsWithTargetPath =
-            ContentFileUtil.replacePathBounds(file, sourcePrefix, 
targetPrefix);
-        DeleteFile movedFile =
-            FileMetadata.deleteFileBuilder(spec)
-                .copy(file)
-                .withPath(targetDeleteFilePath)
-                .withMetrics(metricsWithTargetPath)
-                .build();
-        appendEntryWithFile(entry, writer, movedFile);
+        DeleteFile posDeleteFile = newPositionDeleteEntry(file, spec, 
sourcePrefix, targetPrefix);
+        appendEntryWithFile(entry, writer, posDeleteFile);
         // keep the following entries in metadata but exclude them from 
copyPlan
         // 1) deleted position delete files
         // 2) entries not changed by snapshotIds
@@ -465,7 +463,7 @@ public class RewriteTablePathUtil {
               .add(
                   Pair.of(
                       stagingPath(file.location(), sourcePrefix, 
stagingLocation),
-                      movedFile.location()));
+                      posDeleteFile.location()));
         }
         result.toRewrite().add(file);
         return result;
@@ -524,6 +522,56 @@ public class RewriteTablePathUtil {
         .build();
   }
 
+  private static DeleteFile newPositionDeleteEntry(
+      DeleteFile file, PartitionSpec spec, String sourcePrefix, String 
targetPrefix) {
+    String path = file.location();
+    Preconditions.checkArgument(
+        path.startsWith(sourcePrefix),
+        "Expected delete file %s to start with prefix: %s",
+        path,
+        sourcePrefix);
+
+    FileMetadata.Builder builder =
+        FileMetadata.deleteFileBuilder(spec)
+            .copy(file)
+            .withPath(newPath(path, sourcePrefix, targetPrefix))
+            .withMetrics(ContentFileUtil.replacePathBounds(file, sourcePrefix, 
targetPrefix));
+
+    // Update referencedDataFile for DV files
+    String newReferencedDataFile =
+        rewriteReferencedDataFilePathForDV(file, sourcePrefix, targetPrefix);
+    if (newReferencedDataFile != null) {
+      builder.withReferencedDataFile(newReferencedDataFile);
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Replace the referenced data file path for a DV (Deletion Vector) file.
+   *
+   * <p>For DV files, returns the updated path with the target prefix. For 
non-DV files or files
+   * without a referenced data file, returns null.
+   *
+   * @param deleteFile delete file to check
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix that will replace it
+   * @return updated referenced data file path, or null if not applicable
+   */
+  private static String rewriteReferencedDataFilePathForDV(
+      DeleteFile deleteFile, String sourcePrefix, String targetPrefix) {
+    if (!ContentFileUtil.isDV(deleteFile) || deleteFile.referencedDataFile() 
== null) {
+      return null;
+    }
+
+    String oldReferencedDataFile = deleteFile.referencedDataFile();
+    if (oldReferencedDataFile.startsWith(sourcePrefix)) {
+      return newPath(oldReferencedDataFile, sourcePrefix, targetPrefix);
+    }
+
+    return oldReferencedDataFile;
+  }
+
   /** Class providing engine-specific methods to read and write position 
delete files. */
   public interface PositionDeleteReaderWriter extends Serializable {
     CloseableIterable<Record> reader(InputFile inputFile, FileFormat format, 
PartitionSpec spec);
@@ -562,6 +610,14 @@ public class RewriteTablePathUtil {
       throw new UnsupportedOperationException(
           String.format("Expected delete file %s to start with prefix: %s", 
path, sourcePrefix));
     }
+
+    // DV files (Puffin format for v3+) need special handling to rewrite 
internal blob metadata
+    if (ContentFileUtil.isDV(deleteFile)) {
+      rewriteDVFile(deleteFile, outputFile, io, sourcePrefix, targetPrefix);
+      return;
+    }
+
+    // For non-DV position delete files (v2), rewrite using the reader/writer
     InputFile sourceFile = io.newInputFile(path);
     try (CloseableIterable<Record> reader =
         posDeleteReaderWriter.reader(sourceFile, deleteFile.format(), spec)) {
@@ -592,6 +648,57 @@ public class RewriteTablePathUtil {
     }
   }
 
+  /**
+   * Rewrite a DV (Deletion Vector) file, updating the referenced data file 
paths in blob metadata.
+   *
+   * @param deleteFile source DV file to be rewritten
+   * @param outputFile output file to write the rewritten DV to
+   * @param io file io
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix to replace it
+   */
+  private static void rewriteDVFile(
+      DeleteFile deleteFile,
+      OutputFile outputFile,
+      FileIO io,
+      String sourcePrefix,
+      String targetPrefix)
+      throws IOException {
+    List<Blob> rewrittenBlobs = Lists.newArrayList();
+    try (PuffinReader reader = 
Puffin.read(io.newInputFile(deleteFile.location())).build()) {
+      // Read all blobs and rewrite them with updated referenced data file 
paths
+      for (Pair<org.apache.iceberg.puffin.BlobMetadata, ByteBuffer> blobPair :
+          reader.readAll(reader.fileMetadata().blobs())) {
+        org.apache.iceberg.puffin.BlobMetadata blobMetadata = blobPair.first();
+        ByteBuffer blobData = blobPair.second();
+
+        // Get the original properties and update the referenced data file path
+        Map<String, String> properties = 
Maps.newHashMap(blobMetadata.properties());
+        String referencedDataFile = properties.get("referenced-data-file");
+        if (referencedDataFile != null && 
referencedDataFile.startsWith(sourcePrefix)) {
+          String newReferencedDataFile = newPath(referencedDataFile, 
sourcePrefix, targetPrefix);
+          properties.put("referenced-data-file", newReferencedDataFile);
+        }
+
+        // Create a new blob with updated properties
+        rewrittenBlobs.add(
+            new Blob(
+                blobMetadata.type(),
+                blobMetadata.inputFields(),
+                blobMetadata.snapshotId(),
+                blobMetadata.sequenceNumber(),
+                blobData,
+                
PuffinCompressionCodec.forName(blobMetadata.compressionCodec()),
+                properties));
+      }
+    }
+
+    try (PuffinWriter writer =
+        
Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build()) {
+      rewrittenBlobs.forEach(writer::write);
+    }
+  }
+
   private static PositionDelete newPositionDeleteRecord(
       Record record, String sourcePrefix, String targetPrefix) {
     PositionDelete delete = PositionDelete.create();
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index 6dac5d5da0..0bcaf0af65 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -22,6 +22,7 @@ import static 
org.apache.iceberg.spark.actions.RewriteTablePathSparkAction.NOT_A
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.File;
 import java.io.IOException;
@@ -39,6 +40,9 @@ import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
@@ -59,6 +63,7 @@ import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -80,10 +85,12 @@ import org.apache.spark.storage.BlockManager;
 import org.apache.spark.storage.BroadcastBlockId;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 import scala.Tuple2;
 
+@ExtendWith(ParameterizedTestExtension.class)
 public class TestRewriteTablePathsAction extends TestBase {
 
   @TempDir private Path staging;
@@ -91,6 +98,13 @@ public class TestRewriteTablePathsAction extends TestBase {
   @TempDir private Path newTableDir;
   @TempDir private Path targetTableDir;
 
+  @Parameters(name = "formatVersion = {0}")
+  protected static List<Integer> formatVersions() {
+    return TestHelpers.V2_AND_ABOVE;
+  }
+
+  @Parameter private int formatVersion;
+
   protected ActionsProvider actions() {
     return SparkActions.get();
   }
@@ -135,7 +149,15 @@ public class TestRewriteTablePathsAction extends TestBase {
 
   private Table createTableWithSnapshots(
       String location, int snapshotNumber, Map<String, String> properties, 
String mode) {
-    Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
properties, location);
+    Table newTable =
+        TABLES.create(
+            SCHEMA,
+            PartitionSpec.unpartitioned(),
+            ImmutableMap.<String, String>builder()
+                .put(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion))
+                .putAll(properties)
+                .build(),
+            location);
 
     List<ThreeColumnRecord> records =
         Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
@@ -159,7 +181,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     sql("DROP DATABASE IF EXISTS %s CASCADE", backupNs);
   }
 
-  @Test
+  @TestTemplate
   public void testRewritePath() throws Exception {
     String targetTableLocation = targetTableLocation();
 
@@ -207,7 +229,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     assertEquals("Rows should match after copy", expected, actual);
   }
 
-  @Test
+  @TestTemplate
   public void testSameLocations() {
     assertThatThrownBy(
             () ->
@@ -220,7 +242,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         .hasMessageContaining("Source prefix cannot be the same as target 
prefix");
   }
 
-  @Test
+  @TestTemplate
   public void testStartVersion() throws Exception {
     RewriteTablePath.Result result =
         actions()
@@ -244,7 +266,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         .isEmpty();
   }
 
-  @Test
+  @TestTemplate
   public void testIncrementalRewrite() throws Exception {
     String location = newTableLocation();
     Table sourceTable =
@@ -291,7 +313,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     assertEquals("Rows should match after copy", expected, actual);
   }
 
-  @Test
+  @TestTemplate
   public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path 
location2)
       throws Exception {
     String location = newTableLocation();
@@ -316,7 +338,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(3, 3, 3, 12, result1);
   }
 
-  @Test
+  @TestTemplate
   public void testFullTableRewritePath() throws Exception {
     RewriteTablePath.Result result =
         actions()
@@ -327,7 +349,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(3, 2, 2, 9, result);
   }
 
-  @Test
+  @TestTemplate
   public void testManifestRewriteAndIncrementalCopy() throws Exception {
     RewriteTablePath.Result initialResult =
         actions()
@@ -354,7 +376,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(1, 1, addedManifest, 3, postReweiteResult);
   }
 
-  @Test
+  @TestTemplate
   public void testDeleteDataFile() throws Exception {
     List<String> validDataFiles =
         spark
@@ -386,17 +408,17 @@ public class TestRewriteTablePathsAction extends TestBase 
{
         .hasSize(1);
   }
 
-  @Test
+  @TestTemplate
   public void testPositionDeletesParquet() throws Exception {
     runPositionDeletesTest("parquet");
   }
 
-  @Test
+  @TestTemplate
   public void testPositionDeletesAvro() throws Exception {
     runPositionDeletesTest("avro");
   }
 
-  @Test
+  @TestTemplate
   public void testPositionDeletesOrc() throws Exception {
     runPositionDeletesTest("orc");
   }
@@ -427,7 +449,8 @@ public class TestRewriteTablePathsAction extends TestBase {
         FileHelpers.writeDeleteFile(
                 tableWithPosDeletes,
                 
tableWithPosDeletes.io().newOutputFile(file.toURI().toString()),
-                deletes)
+                deletes,
+                formatVersion)
             .first();
 
     tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit();
@@ -454,7 +477,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         .hasSize(1);
   }
 
-  @Test
+  @TestTemplate
   public void testPositionDeleteWithRow() throws Exception {
     String dataFileLocation =
         
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location();
@@ -467,7 +490,8 @@ public class TestRewriteTablePathsAction extends TestBase {
                     .toURI()
                     .toString());
     deletes.add(positionDelete(SCHEMA, dataFileLocation, 0L, 1, "AAAAAAAAAA", 
"AAAA"));
-    DeleteFile positionDeletes = FileHelpers.writePosDeleteFile(table, 
deleteFile, null, deletes);
+    DeleteFile positionDeletes =
+        FileHelpers.writePosDeleteFile(table, deleteFile, null, deletes, 
formatVersion);
     table.newRowDelta().addDeletes(positionDeletes).commit();
 
     
assertThat(spark.read().format("iceberg").load(table.location()).collectAsList()).hasSize(1);
@@ -486,18 +510,24 @@ public class TestRewriteTablePathsAction extends TestBase 
{
     // copy the metadata files and data files
     copyTableFiles(result);
 
-    // check copied position delete row
-    Object[] deletedRow = (Object[]) rows(targetTableLocation() + 
"#position_deletes").get(0)[2];
-    assertEquals(
-        "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA", 
"AAAA"}, deletedRow);
+    // check copied position delete row - only v2 stores row data with 
position deletes
+    // v3+ uses Deletion Vectors (DV) which only store position information
+    if (formatVersion == 2) {
+      Object[] deletedRow = (Object[]) rows(targetTableLocation() + 
"#position_deletes").get(0)[2];
+      assertEquals(
+          "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA", 
"AAAA"}, deletedRow);
+    }
 
     // Positional delete affects a single row, so only one row must remain
     
assertThat(spark.read().format("iceberg").load(targetTableLocation()).collectAsList())
         .hasSize(1);
   }
 
-  @Test
+  @TestTemplate
   public void testPositionDeletesAcrossFiles() throws Exception {
+    assumeThat(formatVersion)
+        .as("Can't write multiple deletes into a single v3 delete file")
+        .isEqualTo(2);
     Stream<DataFile> allFiles =
         StreamSupport.stream(table.snapshots().spliterator(), false)
             .flatMap(s -> 
StreamSupport.stream(s.addedDataFiles(table.io()).spliterator(), false));
@@ -510,7 +540,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     File file = new File(removePrefix(table.location() + 
"/data/deeply/nested/file.parquet"));
     DeleteFile positionDeletes =
         FileHelpers.writeDeleteFile(
-                table, table.io().newOutputFile(file.toURI().toString()), 
deletes)
+                table, table.io().newOutputFile(file.toURI().toString()), 
deletes, formatVersion)
             .first();
 
     table.newRowDelta().addDeletes(positionDeletes).commit();
@@ -535,7 +565,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         .isEmpty();
   }
 
-  @Test
+  @TestTemplate
   public void testEqualityDeletes() throws Exception {
     Table sourceTable = createTableWithSnapshots(newTableLocation(), 1);
 
@@ -590,7 +620,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         .hasSize(2);
   }
 
-  @Test
+  @TestTemplate
   public void testFullTableRewritePathWithDeletedVersionFiles() throws 
Exception {
     String location = newTableLocation();
     Table sourceTable = createTableWithSnapshots(location, 2);
@@ -636,7 +666,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         result);
   }
 
-  @Test
+  @TestTemplate
   public void testRewritePathWithoutSnapshot() throws Exception {
     RewriteTablePath.Result result =
         actions()
@@ -649,7 +679,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(1, 0, 0, 1, result);
   }
 
-  @Test
+  @TestTemplate
   public void testExpireSnapshotBeforeRewrite() throws Exception {
     // expire one snapshot
     
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -664,7 +694,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(4, 1, 2, 9, result);
   }
 
-  @Test
+  @TestTemplate
   public void testRewritePathWithNonLiveEntry() throws Exception {
     String location = newTableLocation();
     // first overwrite generate 1 manifest and 1 data file
@@ -729,7 +759,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     assertThat(copiedEntries).contains(deletedDataFilePathInTargetLocation);
   }
 
-  @Test
+  @TestTemplate
   public void testStartSnapshotWithoutValidSnapshot() throws Exception {
     // expire one snapshot
     
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -748,7 +778,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(2, 1, 1, 5, result);
   }
 
-  @Test
+  @TestTemplate
   public void testMoveTheVersionExpireSnapshot() throws Exception {
     // expire one snapshot
     
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -766,7 +796,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(1, 0, 0, 1, result);
   }
 
-  @Test
+  @TestTemplate
   public void testMoveVersionWithInvalidSnapshots() {
     // expire one snapshot
     
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -785,7 +815,7 @@ public class TestRewriteTablePathsAction extends TestBase {
                 + "Please choose an earlier version without invalid 
snapshots.");
   }
 
-  @Test
+  @TestTemplate
   public void testRollBack() throws Exception {
     long secondSnapshotId = table.currentSnapshot().snapshotId();
 
@@ -812,7 +842,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(6, 3, 3, 15, result);
   }
 
-  @Test
+  @TestTemplate
   public void testWriteAuditPublish() throws Exception {
     // enable WAP
     table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, 
"true").commit();
@@ -837,7 +867,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(5, 3, 3, 14, result);
   }
 
-  @Test
+  @TestTemplate
   public void testSchemaChange() throws Exception {
     // change the schema
     table.updateSchema().addColumn("c4", Types.StringType.get()).commit();
@@ -854,7 +884,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(4, 2, 2, 10, result);
   }
 
-  @Test
+  @TestTemplate
   public void testSnapshotIdInheritanceEnabled() throws Exception {
     String sourceTableLocation = newTableLocation();
     Map<String, String> properties = Maps.newHashMap();
@@ -872,7 +902,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(3, 2, 2, 9, result);
   }
 
-  @Test
+  @TestTemplate
   public void testMetadataCompression() throws Exception {
     String sourceTableLocation = newTableLocation();
     Map<String, String> properties = Maps.newHashMap();
@@ -898,7 +928,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(2, 2, 2, 8, result);
   }
 
-  @Test
+  @TestTemplate
   public void testInvalidArgs() {
     RewriteTablePath actions = actions().rewriteTablePath(table);
 
@@ -931,12 +961,12 @@ public class TestRewriteTablePathsAction extends TestBase 
{
         .hasMessageContaining("End version('null') cannot be empty");
   }
 
-  @Test
+  @TestTemplate
   public void testTableWithManyPartitionStatisticFile() throws IOException {
     String sourceTableLocation = newTableLocation();
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put("format-version", "2");
-    String tableName = "v2tblwithPartStats";
+    Map<String, String> properties =
+        ImmutableMap.of(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion));
+    String tableName = String.format("v%stblwithPartStats", formatVersion);
     Table sourceTable =
         createMetastoreTable(sourceTableLocation, properties, "default", 
tableName, 0, "c1");
 
@@ -963,12 +993,12 @@ public class TestRewriteTablePathsAction extends TestBase 
{
         result, "partition-stats", sourceTableLocation, targetTableLocation);
   }
 
-  @Test
+  @TestTemplate
   public void testTableWithManyStatisticFiles() throws IOException {
     String sourceTableLocation = newTableLocation();
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put("format-version", "2");
-    String tableName = "v2tblwithmanystats";
+    Map<String, String> properties =
+        ImmutableMap.of(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion));
+    String tableName = String.format("v%stblwithmanystats", formatVersion);
     Table sourceTable =
         createMetastoreTable(sourceTableLocation, properties, "default", 
tableName, 0);
 
@@ -992,12 +1022,12 @@ public class TestRewriteTablePathsAction extends 
TestBase {
         iterations * 2 + 1, iterations, iterations, iterations, iterations * 6 
+ 1, result);
   }
 
-  @Test
+  @TestTemplate
   public void testStatisticsFileSourcePath() throws IOException {
     String sourceTableLocation = newTableLocation();
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put("format-version", "2");
-    String tableName = "v2tblwithstats";
+    Map<String, String> properties =
+        ImmutableMap.of(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion));
+    String tableName = String.format("v%stblwithstats", formatVersion);
     Table sourceTable =
         createMetastoreTable(sourceTableLocation, properties, "default", 
tableName, 1);
 
@@ -1020,13 +1050,17 @@ public class TestRewriteTablePathsAction extends 
TestBase {
     findAndAssertFileInFileList(result, ".stats", sourceTableLocation, 
targetTableLocation);
   }
 
-  @Test
+  @TestTemplate
   public void testMetadataCompressionWithMetastoreTable() throws Exception {
     Map<String, String> properties = Maps.newHashMap();
     properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
     Table sourceTable =
         createMetastoreTable(
-            newTableLocation(), properties, "default", 
"testMetadataCompression", 2);
+            newTableLocation(),
+            properties,
+            "default",
+            String.format("v%sMetadataCompression", formatVersion),
+            2);
 
     TableMetadata currentMetadata = currentMetadata(sourceTable);
 
@@ -1054,10 +1088,11 @@ public class TestRewriteTablePathsAction extends 
TestBase {
   }
 
   // Metastore table tests
-  @Test
+  @TestTemplate
   public void testMetadataLocationChange() throws Exception {
+    String tableName = String.format("v%stblWithLocation", formatVersion);
     Table sourceTable =
-        createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default", 
"tbl", 1);
+        createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default", 
tableName, 1);
     String metadataFilePath = 
currentMetadata(sourceTable).metadataFileLocation();
 
     String newMetadataDir = "new-metadata-dir";
@@ -1066,7 +1101,7 @@ public class TestRewriteTablePathsAction extends TestBase 
{
         .set(TableProperties.WRITE_METADATA_LOCATION, newTableLocation() + 
newMetadataDir)
         .commit();
 
-    spark.sql("insert into hive.default.tbl values (1, 'AAAAAAAAAA', 'AAAA')");
+    sql("insert into hive.default.%s values (1, 'AAAAAAAAAA', 'AAAA')", 
tableName);
     sourceTable.refresh();
 
     // copy table
@@ -1099,12 +1134,15 @@ public class TestRewriteTablePathsAction extends 
TestBase {
     checkFileNum(2, 1, 1, 5, result2);
   }
 
-  @Test
+  @TestTemplate
   public void testDeleteFrom() throws Exception {
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put("format-version", "2");
-    properties.put("write.delete.mode", "merge-on-read");
-    String tableName = "v2tbl";
+    Map<String, String> properties =
+        ImmutableMap.of(
+            TableProperties.FORMAT_VERSION,
+            String.valueOf(formatVersion),
+            "write.delete.mode",
+            "merge-on-read");
+    String tableName = String.format("v%stbl", formatVersion);
     Table sourceTable =
         createMetastoreTable(newTableLocation(), properties, "default", 
tableName, 0);
     // ingest data
@@ -1152,7 +1190,7 @@ public class TestRewriteTablePathsAction extends TestBase 
{
     // register table
     String metadataLocation = 
currentMetadata(sourceTable).metadataFileLocation();
     String versionFile = fileName(metadataLocation);
-    String targetTableName = "copiedV2Table";
+    String targetTableName = String.format("copiedV%sTable", formatVersion);
     TableIdentifier tableIdentifier = TableIdentifier.of("default", 
targetTableName);
     catalog.registerTable(tableIdentifier, targetTableLocation() + 
"/metadata/" + versionFile);
 
@@ -1168,7 +1206,7 @@ public class TestRewriteTablePathsAction extends TestBase 
{
     assertEquals("Rows must match", originalData, copiedData);
   }
 
-  @Test
+  @TestTemplate
   public void testKryoDeserializeBroadcastValues() {
     sparkContext.getConf().set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
     RewriteTablePathSparkAction action =
@@ -1179,8 +1217,11 @@ public class TestRewriteTablePathsAction extends 
TestBase {
     assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid());
   }
 
-  @Test
+  @TestTemplate
   public void testNestedDirectoryStructurePreservation() throws Exception {
+    assumeThat(formatVersion)
+        .as("Can't add multiple DVs for the same data file in v3")
+        .isEqualTo(2);
     String sourceTableLocation = newTableLocation();
     Table sourceTable = createTableWithSnapshots(sourceTableLocation, 1);
 
@@ -1217,12 +1258,18 @@ public class TestRewriteTablePathsAction extends 
TestBase {
 
     DeleteFile positionDeletes1 =
         FileHelpers.writeDeleteFile(
-                sourceTable, 
sourceTable.io().newOutputFile(file1.toURI().toString()), deletes1)
+                sourceTable,
+                sourceTable.io().newOutputFile(file1.toURI().toString()),
+                deletes1,
+                formatVersion)
             .first();
 
     DeleteFile positionDeletes2 =
         FileHelpers.writeDeleteFile(
-                sourceTable, 
sourceTable.io().newOutputFile(file2.toURI().toString()), deletes2)
+                sourceTable,
+                sourceTable.io().newOutputFile(file2.toURI().toString()),
+                deletes2,
+                formatVersion)
             .first();
 
     sourceTable.newRowDelta().addDeletes(positionDeletes1).commit();
@@ -1266,7 +1313,7 @@ public class TestRewriteTablePathsAction extends TestBase 
{
     assertThat(targetPath2).startsWith(targetTableLocation());
   }
 
-  @Test
+  @TestTemplate
   public void testRewritePathWithoutCreateFileList() throws Exception {
     String targetTableLocation = targetTableLocation();
 
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index 6dac5d5da0..0bcaf0af65 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -22,6 +22,7 @@ import static 
org.apache.iceberg.spark.actions.RewriteTablePathSparkAction.NOT_A
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.File;
 import java.io.IOException;
@@ -39,6 +40,9 @@ import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
@@ -59,6 +63,7 @@ import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -80,10 +85,12 @@ import org.apache.spark.storage.BlockManager;
 import org.apache.spark.storage.BroadcastBlockId;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 import scala.Tuple2;
 
+@ExtendWith(ParameterizedTestExtension.class)
 public class TestRewriteTablePathsAction extends TestBase {
 
   @TempDir private Path staging;
@@ -91,6 +98,13 @@ public class TestRewriteTablePathsAction extends TestBase {
   @TempDir private Path newTableDir;
   @TempDir private Path targetTableDir;
 
+  @Parameters(name = "formatVersion = {0}")
+  protected static List<Integer> formatVersions() {
+    return TestHelpers.V2_AND_ABOVE;
+  }
+
+  @Parameter private int formatVersion;
+
   protected ActionsProvider actions() {
     return SparkActions.get();
   }
@@ -135,7 +149,15 @@ public class TestRewriteTablePathsAction extends TestBase {
 
   private Table createTableWithSnapshots(
       String location, int snapshotNumber, Map<String, String> properties, 
String mode) {
-    Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
properties, location);
+    Table newTable =
+        TABLES.create(
+            SCHEMA,
+            PartitionSpec.unpartitioned(),
+            ImmutableMap.<String, String>builder()
+                .put(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion))
+                .putAll(properties)
+                .build(),
+            location);
 
     List<ThreeColumnRecord> records =
         Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
@@ -159,7 +181,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     sql("DROP DATABASE IF EXISTS %s CASCADE", backupNs);
   }
 
-  @Test
+  @TestTemplate
   public void testRewritePath() throws Exception {
     String targetTableLocation = targetTableLocation();
 
@@ -207,7 +229,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     assertEquals("Rows should match after copy", expected, actual);
   }
 
-  @Test
+  @TestTemplate
   public void testSameLocations() {
     assertThatThrownBy(
             () ->
@@ -220,7 +242,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         .hasMessageContaining("Source prefix cannot be the same as target 
prefix");
   }
 
-  @Test
+  @TestTemplate
   public void testStartVersion() throws Exception {
     RewriteTablePath.Result result =
         actions()
@@ -244,7 +266,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         .isEmpty();
   }
 
-  @Test
+  @TestTemplate
   public void testIncrementalRewrite() throws Exception {
     String location = newTableLocation();
     Table sourceTable =
@@ -291,7 +313,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     assertEquals("Rows should match after copy", expected, actual);
   }
 
-  @Test
+  @TestTemplate
   public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path 
location2)
       throws Exception {
     String location = newTableLocation();
@@ -316,7 +338,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(3, 3, 3, 12, result1);
   }
 
-  @Test
+  @TestTemplate
   public void testFullTableRewritePath() throws Exception {
     RewriteTablePath.Result result =
         actions()
@@ -327,7 +349,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(3, 2, 2, 9, result);
   }
 
-  @Test
+  @TestTemplate
   public void testManifestRewriteAndIncrementalCopy() throws Exception {
     RewriteTablePath.Result initialResult =
         actions()
@@ -354,7 +376,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(1, 1, addedManifest, 3, postReweiteResult);
   }
 
-  @Test
+  @TestTemplate
   public void testDeleteDataFile() throws Exception {
     List<String> validDataFiles =
         spark
@@ -386,17 +408,17 @@ public class TestRewriteTablePathsAction extends TestBase 
{
         .hasSize(1);
   }
 
-  @Test
+  @TestTemplate
   public void testPositionDeletesParquet() throws Exception {
     runPositionDeletesTest("parquet");
   }
 
-  @Test
+  @TestTemplate
   public void testPositionDeletesAvro() throws Exception {
     runPositionDeletesTest("avro");
   }
 
-  @Test
+  @TestTemplate
   public void testPositionDeletesOrc() throws Exception {
     runPositionDeletesTest("orc");
   }
@@ -427,7 +449,8 @@ public class TestRewriteTablePathsAction extends TestBase {
         FileHelpers.writeDeleteFile(
                 tableWithPosDeletes,
                 
tableWithPosDeletes.io().newOutputFile(file.toURI().toString()),
-                deletes)
+                deletes,
+                formatVersion)
             .first();
 
     tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit();
@@ -454,7 +477,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         .hasSize(1);
   }
 
-  @Test
+  @TestTemplate
   public void testPositionDeleteWithRow() throws Exception {
     String dataFileLocation =
         
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location();
@@ -467,7 +490,8 @@ public class TestRewriteTablePathsAction extends TestBase {
                     .toURI()
                     .toString());
     deletes.add(positionDelete(SCHEMA, dataFileLocation, 0L, 1, "AAAAAAAAAA", 
"AAAA"));
-    DeleteFile positionDeletes = FileHelpers.writePosDeleteFile(table, 
deleteFile, null, deletes);
+    DeleteFile positionDeletes =
+        FileHelpers.writePosDeleteFile(table, deleteFile, null, deletes, 
formatVersion);
     table.newRowDelta().addDeletes(positionDeletes).commit();
 
     
assertThat(spark.read().format("iceberg").load(table.location()).collectAsList()).hasSize(1);
@@ -486,18 +510,24 @@ public class TestRewriteTablePathsAction extends TestBase 
{
     // copy the metadata files and data files
     copyTableFiles(result);
 
-    // check copied position delete row
-    Object[] deletedRow = (Object[]) rows(targetTableLocation() + 
"#position_deletes").get(0)[2];
-    assertEquals(
-        "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA", 
"AAAA"}, deletedRow);
+    // check copied position delete row - only v2 stores row data with 
position deletes
+    // v3+ uses Deletion Vectors (DV) which only store position information
+    if (formatVersion == 2) {
+      Object[] deletedRow = (Object[]) rows(targetTableLocation() + 
"#position_deletes").get(0)[2];
+      assertEquals(
+          "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA", 
"AAAA"}, deletedRow);
+    }
 
     // Positional delete affects a single row, so only one row must remain
     
assertThat(spark.read().format("iceberg").load(targetTableLocation()).collectAsList())
         .hasSize(1);
   }
 
-  @Test
+  @TestTemplate
   public void testPositionDeletesAcrossFiles() throws Exception {
+    assumeThat(formatVersion)
+        .as("Can't write multiple deletes into a single v3 delete file")
+        .isEqualTo(2);
     Stream<DataFile> allFiles =
         StreamSupport.stream(table.snapshots().spliterator(), false)
             .flatMap(s -> 
StreamSupport.stream(s.addedDataFiles(table.io()).spliterator(), false));
@@ -510,7 +540,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     File file = new File(removePrefix(table.location() + 
"/data/deeply/nested/file.parquet"));
     DeleteFile positionDeletes =
         FileHelpers.writeDeleteFile(
-                table, table.io().newOutputFile(file.toURI().toString()), 
deletes)
+                table, table.io().newOutputFile(file.toURI().toString()), 
deletes, formatVersion)
             .first();
 
     table.newRowDelta().addDeletes(positionDeletes).commit();
@@ -535,7 +565,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         .isEmpty();
   }
 
-  @Test
+  @TestTemplate
   public void testEqualityDeletes() throws Exception {
     Table sourceTable = createTableWithSnapshots(newTableLocation(), 1);
 
@@ -590,7 +620,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         .hasSize(2);
   }
 
-  @Test
+  @TestTemplate
   public void testFullTableRewritePathWithDeletedVersionFiles() throws 
Exception {
     String location = newTableLocation();
     Table sourceTable = createTableWithSnapshots(location, 2);
@@ -636,7 +666,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         result);
   }
 
-  @Test
+  @TestTemplate
   public void testRewritePathWithoutSnapshot() throws Exception {
     RewriteTablePath.Result result =
         actions()
@@ -649,7 +679,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(1, 0, 0, 1, result);
   }
 
-  @Test
+  @TestTemplate
   public void testExpireSnapshotBeforeRewrite() throws Exception {
     // expire one snapshot
     
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -664,7 +694,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(4, 1, 2, 9, result);
   }
 
-  @Test
+  @TestTemplate
   public void testRewritePathWithNonLiveEntry() throws Exception {
     String location = newTableLocation();
     // first overwrite generate 1 manifest and 1 data file
@@ -729,7 +759,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     assertThat(copiedEntries).contains(deletedDataFilePathInTargetLocation);
   }
 
-  @Test
+  @TestTemplate
   public void testStartSnapshotWithoutValidSnapshot() throws Exception {
     // expire one snapshot
     
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -748,7 +778,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(2, 1, 1, 5, result);
   }
 
-  @Test
+  @TestTemplate
   public void testMoveTheVersionExpireSnapshot() throws Exception {
     // expire one snapshot
     
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -766,7 +796,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(1, 0, 0, 1, result);
   }
 
-  @Test
+  @TestTemplate
   public void testMoveVersionWithInvalidSnapshots() {
     // expire one snapshot
     
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -785,7 +815,7 @@ public class TestRewriteTablePathsAction extends TestBase {
                 + "Please choose an earlier version without invalid 
snapshots.");
   }
 
-  @Test
+  @TestTemplate
   public void testRollBack() throws Exception {
     long secondSnapshotId = table.currentSnapshot().snapshotId();
 
@@ -812,7 +842,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(6, 3, 3, 15, result);
   }
 
-  @Test
+  @TestTemplate
   public void testWriteAuditPublish() throws Exception {
     // enable WAP
     table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, 
"true").commit();
@@ -837,7 +867,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(5, 3, 3, 14, result);
   }
 
-  @Test
+  @TestTemplate
   public void testSchemaChange() throws Exception {
     // change the schema
     table.updateSchema().addColumn("c4", Types.StringType.get()).commit();
@@ -854,7 +884,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(4, 2, 2, 10, result);
   }
 
-  @Test
+  @TestTemplate
   public void testSnapshotIdInheritanceEnabled() throws Exception {
     String sourceTableLocation = newTableLocation();
     Map<String, String> properties = Maps.newHashMap();
@@ -872,7 +902,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(3, 2, 2, 9, result);
   }
 
-  @Test
+  @TestTemplate
   public void testMetadataCompression() throws Exception {
     String sourceTableLocation = newTableLocation();
     Map<String, String> properties = Maps.newHashMap();
@@ -898,7 +928,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(2, 2, 2, 8, result);
   }
 
-  @Test
+  @TestTemplate
   public void testInvalidArgs() {
     RewriteTablePath actions = actions().rewriteTablePath(table);
 
@@ -931,12 +961,12 @@ public class TestRewriteTablePathsAction extends TestBase 
{
         .hasMessageContaining("End version('null') cannot be empty");
   }
 
-  @Test
+  @TestTemplate
   public void testTableWithManyPartitionStatisticFile() throws IOException {
     String sourceTableLocation = newTableLocation();
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put("format-version", "2");
-    String tableName = "v2tblwithPartStats";
+    Map<String, String> properties =
+        ImmutableMap.of(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion));
+    String tableName = String.format("v%stblwithPartStats", formatVersion);
     Table sourceTable =
         createMetastoreTable(sourceTableLocation, properties, "default", 
tableName, 0, "c1");
 
@@ -963,12 +993,12 @@ public class TestRewriteTablePathsAction extends TestBase 
{
         result, "partition-stats", sourceTableLocation, targetTableLocation);
   }
 
-  @Test
+  @TestTemplate
   public void testTableWithManyStatisticFiles() throws IOException {
     String sourceTableLocation = newTableLocation();
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put("format-version", "2");
-    String tableName = "v2tblwithmanystats";
+    Map<String, String> properties =
+        ImmutableMap.of(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion));
+    String tableName = String.format("v%stblwithmanystats", formatVersion);
     Table sourceTable =
         createMetastoreTable(sourceTableLocation, properties, "default", 
tableName, 0);
 
@@ -992,12 +1022,12 @@ public class TestRewriteTablePathsAction extends 
TestBase {
         iterations * 2 + 1, iterations, iterations, iterations, iterations * 6 
+ 1, result);
   }
 
-  @Test
+  @TestTemplate
   public void testStatisticsFileSourcePath() throws IOException {
     String sourceTableLocation = newTableLocation();
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put("format-version", "2");
-    String tableName = "v2tblwithstats";
+    Map<String, String> properties =
+        ImmutableMap.of(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion));
+    String tableName = String.format("v%stblwithstats", formatVersion);
     Table sourceTable =
         createMetastoreTable(sourceTableLocation, properties, "default", 
tableName, 1);
 
@@ -1020,13 +1050,17 @@ public class TestRewriteTablePathsAction extends 
TestBase {
     findAndAssertFileInFileList(result, ".stats", sourceTableLocation, 
targetTableLocation);
   }
 
-  @Test
+  @TestTemplate
   public void testMetadataCompressionWithMetastoreTable() throws Exception {
     Map<String, String> properties = Maps.newHashMap();
     properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
     Table sourceTable =
         createMetastoreTable(
-            newTableLocation(), properties, "default", 
"testMetadataCompression", 2);
+            newTableLocation(),
+            properties,
+            "default",
+            String.format("v%sMetadataCompression", formatVersion),
+            2);
 
     TableMetadata currentMetadata = currentMetadata(sourceTable);
 
@@ -1054,10 +1088,11 @@ public class TestRewriteTablePathsAction extends 
TestBase {
   }
 
   // Metastore table tests
-  @Test
+  @TestTemplate
   public void testMetadataLocationChange() throws Exception {
+    String tableName = String.format("v%stblWithLocation", formatVersion);
     Table sourceTable =
-        createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default", 
"tbl", 1);
+        createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default", 
tableName, 1);
     String metadataFilePath = 
currentMetadata(sourceTable).metadataFileLocation();
 
     String newMetadataDir = "new-metadata-dir";
@@ -1066,7 +1101,7 @@ public class TestRewriteTablePathsAction extends TestBase 
{
         .set(TableProperties.WRITE_METADATA_LOCATION, newTableLocation() + 
newMetadataDir)
         .commit();
 
-    spark.sql("insert into hive.default.tbl values (1, 'AAAAAAAAAA', 'AAAA')");
+    sql("insert into hive.default.%s values (1, 'AAAAAAAAAA', 'AAAA')", 
tableName);
     sourceTable.refresh();
 
     // copy table
@@ -1099,12 +1134,15 @@ public class TestRewriteTablePathsAction extends 
TestBase {
     checkFileNum(2, 1, 1, 5, result2);
   }
 
-  @Test
+  @TestTemplate
   public void testDeleteFrom() throws Exception {
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put("format-version", "2");
-    properties.put("write.delete.mode", "merge-on-read");
-    String tableName = "v2tbl";
+    Map<String, String> properties =
+        ImmutableMap.of(
+            TableProperties.FORMAT_VERSION,
+            String.valueOf(formatVersion),
+            "write.delete.mode",
+            "merge-on-read");
+    String tableName = String.format("v%stbl", formatVersion);
     Table sourceTable =
         createMetastoreTable(newTableLocation(), properties, "default", 
tableName, 0);
     // ingest data
@@ -1152,7 +1190,7 @@ public class TestRewriteTablePathsAction extends TestBase 
{
     // register table
     String metadataLocation = 
currentMetadata(sourceTable).metadataFileLocation();
     String versionFile = fileName(metadataLocation);
-    String targetTableName = "copiedV2Table";
+    String targetTableName = String.format("copiedV%sTable", formatVersion);
     TableIdentifier tableIdentifier = TableIdentifier.of("default", 
targetTableName);
     catalog.registerTable(tableIdentifier, targetTableLocation() + 
"/metadata/" + versionFile);
 
@@ -1168,7 +1206,7 @@ public class TestRewriteTablePathsAction extends TestBase 
{
     assertEquals("Rows must match", originalData, copiedData);
   }
 
-  @Test
+  @TestTemplate
   public void testKryoDeserializeBroadcastValues() {
     sparkContext.getConf().set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
     RewriteTablePathSparkAction action =
@@ -1179,8 +1217,11 @@ public class TestRewriteTablePathsAction extends 
TestBase {
     assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid());
   }
 
-  @Test
+  @TestTemplate
   public void testNestedDirectoryStructurePreservation() throws Exception {
+    assumeThat(formatVersion)
+        .as("Can't add multiple DVs for the same data file in v3")
+        .isEqualTo(2);
     String sourceTableLocation = newTableLocation();
     Table sourceTable = createTableWithSnapshots(sourceTableLocation, 1);
 
@@ -1217,12 +1258,18 @@ public class TestRewriteTablePathsAction extends 
TestBase {
 
     DeleteFile positionDeletes1 =
         FileHelpers.writeDeleteFile(
-                sourceTable, 
sourceTable.io().newOutputFile(file1.toURI().toString()), deletes1)
+                sourceTable,
+                sourceTable.io().newOutputFile(file1.toURI().toString()),
+                deletes1,
+                formatVersion)
             .first();
 
     DeleteFile positionDeletes2 =
         FileHelpers.writeDeleteFile(
-                sourceTable, 
sourceTable.io().newOutputFile(file2.toURI().toString()), deletes2)
+                sourceTable,
+                sourceTable.io().newOutputFile(file2.toURI().toString()),
+                deletes2,
+                formatVersion)
             .first();
 
     sourceTable.newRowDelta().addDeletes(positionDeletes1).commit();
@@ -1266,7 +1313,7 @@ public class TestRewriteTablePathsAction extends TestBase 
{
     assertThat(targetPath2).startsWith(targetTableLocation());
   }
 
-  @Test
+  @TestTemplate
   public void testRewritePathWithoutCreateFileList() throws Exception {
     String targetTableLocation = targetTableLocation();
 
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index 6dac5d5da0..0bcaf0af65 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -22,6 +22,7 @@ import static 
org.apache.iceberg.spark.actions.RewriteTablePathSparkAction.NOT_A
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.File;
 import java.io.IOException;
@@ -39,6 +40,9 @@ import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
@@ -59,6 +63,7 @@ import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -80,10 +85,12 @@ import org.apache.spark.storage.BlockManager;
 import org.apache.spark.storage.BroadcastBlockId;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 import scala.Tuple2;
 
+@ExtendWith(ParameterizedTestExtension.class)
 public class TestRewriteTablePathsAction extends TestBase {
 
   @TempDir private Path staging;
@@ -91,6 +98,13 @@ public class TestRewriteTablePathsAction extends TestBase {
   @TempDir private Path newTableDir;
   @TempDir private Path targetTableDir;
 
+  @Parameters(name = "formatVersion = {0}")
+  protected static List<Integer> formatVersions() {
+    return TestHelpers.V2_AND_ABOVE;
+  }
+
+  @Parameter private int formatVersion;
+
   protected ActionsProvider actions() {
     return SparkActions.get();
   }
@@ -135,7 +149,15 @@ public class TestRewriteTablePathsAction extends TestBase {
 
   private Table createTableWithSnapshots(
       String location, int snapshotNumber, Map<String, String> properties, 
String mode) {
-    Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
properties, location);
+    Table newTable =
+        TABLES.create(
+            SCHEMA,
+            PartitionSpec.unpartitioned(),
+            ImmutableMap.<String, String>builder()
+                .put(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion))
+                .putAll(properties)
+                .build(),
+            location);
 
     List<ThreeColumnRecord> records =
         Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
@@ -159,7 +181,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     sql("DROP DATABASE IF EXISTS %s CASCADE", backupNs);
   }
 
-  @Test
+  @TestTemplate
   public void testRewritePath() throws Exception {
     String targetTableLocation = targetTableLocation();
 
@@ -207,7 +229,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     assertEquals("Rows should match after copy", expected, actual);
   }
 
-  @Test
+  @TestTemplate
   public void testSameLocations() {
     assertThatThrownBy(
             () ->
@@ -220,7 +242,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         .hasMessageContaining("Source prefix cannot be the same as target 
prefix");
   }
 
-  @Test
+  @TestTemplate
   public void testStartVersion() throws Exception {
     RewriteTablePath.Result result =
         actions()
@@ -244,7 +266,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         .isEmpty();
   }
 
-  @Test
+  @TestTemplate
   public void testIncrementalRewrite() throws Exception {
     String location = newTableLocation();
     Table sourceTable =
@@ -291,7 +313,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     assertEquals("Rows should match after copy", expected, actual);
   }
 
-  @Test
+  @TestTemplate
   public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path 
location2)
       throws Exception {
     String location = newTableLocation();
@@ -316,7 +338,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(3, 3, 3, 12, result1);
   }
 
-  @Test
+  @TestTemplate
   public void testFullTableRewritePath() throws Exception {
     RewriteTablePath.Result result =
         actions()
@@ -327,7 +349,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(3, 2, 2, 9, result);
   }
 
-  @Test
+  @TestTemplate
   public void testManifestRewriteAndIncrementalCopy() throws Exception {
     RewriteTablePath.Result initialResult =
         actions()
@@ -354,7 +376,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(1, 1, addedManifest, 3, postReweiteResult);
   }
 
-  @Test
+  @TestTemplate
   public void testDeleteDataFile() throws Exception {
     List<String> validDataFiles =
         spark
@@ -386,17 +408,17 @@ public class TestRewriteTablePathsAction extends TestBase 
{
         .hasSize(1);
   }
 
-  @Test
+  @TestTemplate
   public void testPositionDeletesParquet() throws Exception {
     runPositionDeletesTest("parquet");
   }
 
-  @Test
+  @TestTemplate
   public void testPositionDeletesAvro() throws Exception {
     runPositionDeletesTest("avro");
   }
 
-  @Test
+  @TestTemplate
   public void testPositionDeletesOrc() throws Exception {
     runPositionDeletesTest("orc");
   }
@@ -427,7 +449,8 @@ public class TestRewriteTablePathsAction extends TestBase {
         FileHelpers.writeDeleteFile(
                 tableWithPosDeletes,
                 
tableWithPosDeletes.io().newOutputFile(file.toURI().toString()),
-                deletes)
+                deletes,
+                formatVersion)
             .first();
 
     tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit();
@@ -454,7 +477,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         .hasSize(1);
   }
 
-  @Test
+  @TestTemplate
   public void testPositionDeleteWithRow() throws Exception {
     String dataFileLocation =
         
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location();
@@ -467,7 +490,8 @@ public class TestRewriteTablePathsAction extends TestBase {
                     .toURI()
                     .toString());
     deletes.add(positionDelete(SCHEMA, dataFileLocation, 0L, 1, "AAAAAAAAAA", 
"AAAA"));
-    DeleteFile positionDeletes = FileHelpers.writePosDeleteFile(table, 
deleteFile, null, deletes);
+    DeleteFile positionDeletes =
+        FileHelpers.writePosDeleteFile(table, deleteFile, null, deletes, 
formatVersion);
     table.newRowDelta().addDeletes(positionDeletes).commit();
 
     
assertThat(spark.read().format("iceberg").load(table.location()).collectAsList()).hasSize(1);
@@ -486,18 +510,24 @@ public class TestRewriteTablePathsAction extends TestBase 
{
     // copy the metadata files and data files
     copyTableFiles(result);
 
-    // check copied position delete row
-    Object[] deletedRow = (Object[]) rows(targetTableLocation() + 
"#position_deletes").get(0)[2];
-    assertEquals(
-        "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA", 
"AAAA"}, deletedRow);
+    // check copied position delete row - only v2 stores row data with 
position deletes
+    // v3+ uses Deletion Vectors (DV) which only store position information
+    if (formatVersion == 2) {
+      Object[] deletedRow = (Object[]) rows(targetTableLocation() + 
"#position_deletes").get(0)[2];
+      assertEquals(
+          "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA", 
"AAAA"}, deletedRow);
+    }
 
     // Positional delete affects a single row, so only one row must remain
     
assertThat(spark.read().format("iceberg").load(targetTableLocation()).collectAsList())
         .hasSize(1);
   }
 
-  @Test
+  @TestTemplate
   public void testPositionDeletesAcrossFiles() throws Exception {
+    assumeThat(formatVersion)
+        .as("Can't write multiple deletes into a single v3 delete file")
+        .isEqualTo(2);
     Stream<DataFile> allFiles =
         StreamSupport.stream(table.snapshots().spliterator(), false)
             .flatMap(s -> 
StreamSupport.stream(s.addedDataFiles(table.io()).spliterator(), false));
@@ -510,7 +540,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     File file = new File(removePrefix(table.location() + 
"/data/deeply/nested/file.parquet"));
     DeleteFile positionDeletes =
         FileHelpers.writeDeleteFile(
-                table, table.io().newOutputFile(file.toURI().toString()), 
deletes)
+                table, table.io().newOutputFile(file.toURI().toString()), 
deletes, formatVersion)
             .first();
 
     table.newRowDelta().addDeletes(positionDeletes).commit();
@@ -535,7 +565,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         .isEmpty();
   }
 
-  @Test
+  @TestTemplate
   public void testEqualityDeletes() throws Exception {
     Table sourceTable = createTableWithSnapshots(newTableLocation(), 1);
 
@@ -590,7 +620,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         .hasSize(2);
   }
 
-  @Test
+  @TestTemplate
   public void testFullTableRewritePathWithDeletedVersionFiles() throws 
Exception {
     String location = newTableLocation();
     Table sourceTable = createTableWithSnapshots(location, 2);
@@ -636,7 +666,7 @@ public class TestRewriteTablePathsAction extends TestBase {
         result);
   }
 
-  @Test
+  @TestTemplate
   public void testRewritePathWithoutSnapshot() throws Exception {
     RewriteTablePath.Result result =
         actions()
@@ -649,7 +679,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(1, 0, 0, 1, result);
   }
 
-  @Test
+  @TestTemplate
   public void testExpireSnapshotBeforeRewrite() throws Exception {
     // expire one snapshot
     
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -664,7 +694,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(4, 1, 2, 9, result);
   }
 
-  @Test
+  @TestTemplate
   public void testRewritePathWithNonLiveEntry() throws Exception {
     String location = newTableLocation();
     // first overwrite generate 1 manifest and 1 data file
@@ -729,7 +759,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     assertThat(copiedEntries).contains(deletedDataFilePathInTargetLocation);
   }
 
-  @Test
+  @TestTemplate
   public void testStartSnapshotWithoutValidSnapshot() throws Exception {
     // expire one snapshot
     
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -748,7 +778,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(2, 1, 1, 5, result);
   }
 
-  @Test
+  @TestTemplate
   public void testMoveTheVersionExpireSnapshot() throws Exception {
     // expire one snapshot
     
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -766,7 +796,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(1, 0, 0, 1, result);
   }
 
-  @Test
+  @TestTemplate
   public void testMoveVersionWithInvalidSnapshots() {
     // expire one snapshot
     
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -785,7 +815,7 @@ public class TestRewriteTablePathsAction extends TestBase {
                 + "Please choose an earlier version without invalid 
snapshots.");
   }
 
-  @Test
+  @TestTemplate
   public void testRollBack() throws Exception {
     long secondSnapshotId = table.currentSnapshot().snapshotId();
 
@@ -812,7 +842,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(6, 3, 3, 15, result);
   }
 
-  @Test
+  @TestTemplate
   public void testWriteAuditPublish() throws Exception {
     // enable WAP
     table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, 
"true").commit();
@@ -837,7 +867,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(5, 3, 3, 14, result);
   }
 
-  @Test
+  @TestTemplate
   public void testSchemaChange() throws Exception {
     // change the schema
     table.updateSchema().addColumn("c4", Types.StringType.get()).commit();
@@ -854,7 +884,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(4, 2, 2, 10, result);
   }
 
-  @Test
+  @TestTemplate
   public void testSnapshotIdInheritanceEnabled() throws Exception {
     String sourceTableLocation = newTableLocation();
     Map<String, String> properties = Maps.newHashMap();
@@ -872,7 +902,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(3, 2, 2, 9, result);
   }
 
-  @Test
+  @TestTemplate
   public void testMetadataCompression() throws Exception {
     String sourceTableLocation = newTableLocation();
     Map<String, String> properties = Maps.newHashMap();
@@ -898,7 +928,7 @@ public class TestRewriteTablePathsAction extends TestBase {
     checkFileNum(2, 2, 2, 8, result);
   }
 
-  @Test
+  @TestTemplate
   public void testInvalidArgs() {
     RewriteTablePath actions = actions().rewriteTablePath(table);
 
@@ -931,12 +961,12 @@ public class TestRewriteTablePathsAction extends TestBase 
{
         .hasMessageContaining("End version('null') cannot be empty");
   }
 
-  @Test
+  @TestTemplate
   public void testTableWithManyPartitionStatisticFile() throws IOException {
     String sourceTableLocation = newTableLocation();
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put("format-version", "2");
-    String tableName = "v2tblwithPartStats";
+    Map<String, String> properties =
+        ImmutableMap.of(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion));
+    String tableName = String.format("v%stblwithPartStats", formatVersion);
     Table sourceTable =
         createMetastoreTable(sourceTableLocation, properties, "default", 
tableName, 0, "c1");
 
@@ -963,12 +993,12 @@ public class TestRewriteTablePathsAction extends TestBase 
{
         result, "partition-stats", sourceTableLocation, targetTableLocation);
   }
 
-  @Test
+  @TestTemplate
   public void testTableWithManyStatisticFiles() throws IOException {
     String sourceTableLocation = newTableLocation();
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put("format-version", "2");
-    String tableName = "v2tblwithmanystats";
+    Map<String, String> properties =
+        ImmutableMap.of(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion));
+    String tableName = String.format("v%stblwithmanystats", formatVersion);
     Table sourceTable =
         createMetastoreTable(sourceTableLocation, properties, "default", 
tableName, 0);
 
@@ -992,12 +1022,12 @@ public class TestRewriteTablePathsAction extends 
TestBase {
         iterations * 2 + 1, iterations, iterations, iterations, iterations * 6 
+ 1, result);
   }
 
-  @Test
+  @TestTemplate
   public void testStatisticsFileSourcePath() throws IOException {
     String sourceTableLocation = newTableLocation();
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put("format-version", "2");
-    String tableName = "v2tblwithstats";
+    Map<String, String> properties =
+        ImmutableMap.of(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion));
+    String tableName = String.format("v%stblwithstats", formatVersion);
     Table sourceTable =
         createMetastoreTable(sourceTableLocation, properties, "default", 
tableName, 1);
 
@@ -1020,13 +1050,17 @@ public class TestRewriteTablePathsAction extends 
TestBase {
     findAndAssertFileInFileList(result, ".stats", sourceTableLocation, 
targetTableLocation);
   }
 
-  @Test
+  @TestTemplate
   public void testMetadataCompressionWithMetastoreTable() throws Exception {
     Map<String, String> properties = Maps.newHashMap();
     properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
     Table sourceTable =
         createMetastoreTable(
-            newTableLocation(), properties, "default", 
"testMetadataCompression", 2);
+            newTableLocation(),
+            properties,
+            "default",
+            String.format("v%sMetadataCompression", formatVersion),
+            2);
 
     TableMetadata currentMetadata = currentMetadata(sourceTable);
 
@@ -1054,10 +1088,11 @@ public class TestRewriteTablePathsAction extends 
TestBase {
   }
 
   // Metastore table tests
-  @Test
+  @TestTemplate
   public void testMetadataLocationChange() throws Exception {
+    String tableName = String.format("v%stblWithLocation", formatVersion);
     Table sourceTable =
-        createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default", 
"tbl", 1);
+        createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default", 
tableName, 1);
     String metadataFilePath = 
currentMetadata(sourceTable).metadataFileLocation();
 
     String newMetadataDir = "new-metadata-dir";
@@ -1066,7 +1101,7 @@ public class TestRewriteTablePathsAction extends TestBase 
{
         .set(TableProperties.WRITE_METADATA_LOCATION, newTableLocation() + 
newMetadataDir)
         .commit();
 
-    spark.sql("insert into hive.default.tbl values (1, 'AAAAAAAAAA', 'AAAA')");
+    sql("insert into hive.default.%s values (1, 'AAAAAAAAAA', 'AAAA')", 
tableName);
     sourceTable.refresh();
 
     // copy table
@@ -1099,12 +1134,15 @@ public class TestRewriteTablePathsAction extends 
TestBase {
     checkFileNum(2, 1, 1, 5, result2);
   }
 
-  @Test
+  @TestTemplate
   public void testDeleteFrom() throws Exception {
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put("format-version", "2");
-    properties.put("write.delete.mode", "merge-on-read");
-    String tableName = "v2tbl";
+    Map<String, String> properties =
+        ImmutableMap.of(
+            TableProperties.FORMAT_VERSION,
+            String.valueOf(formatVersion),
+            "write.delete.mode",
+            "merge-on-read");
+    String tableName = String.format("v%stbl", formatVersion);
     Table sourceTable =
         createMetastoreTable(newTableLocation(), properties, "default", 
tableName, 0);
     // ingest data
@@ -1152,7 +1190,7 @@ public class TestRewriteTablePathsAction extends TestBase 
{
     // register table
     String metadataLocation = 
currentMetadata(sourceTable).metadataFileLocation();
     String versionFile = fileName(metadataLocation);
-    String targetTableName = "copiedV2Table";
+    String targetTableName = String.format("copiedV%sTable", formatVersion);
     TableIdentifier tableIdentifier = TableIdentifier.of("default", 
targetTableName);
     catalog.registerTable(tableIdentifier, targetTableLocation() + 
"/metadata/" + versionFile);
 
@@ -1168,7 +1206,7 @@ public class TestRewriteTablePathsAction extends TestBase 
{
     assertEquals("Rows must match", originalData, copiedData);
   }
 
-  @Test
+  @TestTemplate
   public void testKryoDeserializeBroadcastValues() {
     sparkContext.getConf().set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
     RewriteTablePathSparkAction action =
@@ -1179,8 +1217,11 @@ public class TestRewriteTablePathsAction extends 
TestBase {
     assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid());
   }
 
-  @Test
+  @TestTemplate
   public void testNestedDirectoryStructurePreservation() throws Exception {
+    assumeThat(formatVersion)
+        .as("Can't add multiple DVs for the same data file in v3")
+        .isEqualTo(2);
     String sourceTableLocation = newTableLocation();
     Table sourceTable = createTableWithSnapshots(sourceTableLocation, 1);
 
@@ -1217,12 +1258,18 @@ public class TestRewriteTablePathsAction extends 
TestBase {
 
     DeleteFile positionDeletes1 =
         FileHelpers.writeDeleteFile(
-                sourceTable, 
sourceTable.io().newOutputFile(file1.toURI().toString()), deletes1)
+                sourceTable,
+                sourceTable.io().newOutputFile(file1.toURI().toString()),
+                deletes1,
+                formatVersion)
             .first();
 
     DeleteFile positionDeletes2 =
         FileHelpers.writeDeleteFile(
-                sourceTable, 
sourceTable.io().newOutputFile(file2.toURI().toString()), deletes2)
+                sourceTable,
+                sourceTable.io().newOutputFile(file2.toURI().toString()),
+                deletes2,
+                formatVersion)
             .first();
 
     sourceTable.newRowDelta().addDeletes(positionDeletes1).commit();
@@ -1266,7 +1313,7 @@ public class TestRewriteTablePathsAction extends TestBase 
{
     assertThat(targetPath2).startsWith(targetTableLocation());
   }
 
-  @Test
+  @TestTemplate
   public void testRewritePathWithoutCreateFileList() throws Exception {
     String targetTableLocation = targetTableLocation();
 

Reply via email to