This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d2046a7d9e Core, Spark: Preserve DV-specific fields for deletion
vectors (#14351)
d2046a7d9e is described below
commit d2046a7d9e7a94745e7cad610817df6b761dd085
Author: adawrapub <[email protected]>
AuthorDate: Tue Nov 11 23:46:26 2025 -0800
Core, Spark: Preserve DV-specific fields for deletion vectors (#14351)
DV files (Puffin format) store the referenced data file path in two
separate locations:
Manifest Metadata: DeleteFile.referencedDataFile() field
Puffin Blob Metadata: "referenced-data-file" property inside the blob
The original implementation only updated the manifest metadata. The Puffin
blob metadata still contained the old path, causing the DV reader to fail when
applying deletes at the new location.
I have implemented a two-pronged update strategy. Please let me know if
there is a better way
1. Manifest Metadata Update
Added ContentFileUtil.replaceReferencedDataFile() utility method
Created RewriteTablePathUtil.newPositionDeleteEntry() helper method
Updates the referencedDataFile field in manifest entries
2. Puffin Content Update
Implemented RewriteTablePathUtil.rewriteDVFile() method
Reads Puffin files, updates blob metadata properties, writes new files
Preserves the bitmap data while updating path references
---
.../main/java/org/apache/iceberg/FileMetadata.java | 5 +
.../org/apache/iceberg/RewriteTablePathUtil.java | 129 +++++++++++++--
.../spark/actions/TestRewriteTablePathsAction.java | 175 +++++++++++++--------
.../spark/actions/TestRewriteTablePathsAction.java | 175 +++++++++++++--------
.../spark/actions/TestRewriteTablePathsAction.java | 175 +++++++++++++--------
5 files changed, 456 insertions(+), 203 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/FileMetadata.java
b/core/src/main/java/org/apache/iceberg/FileMetadata.java
index 4a0668976a..a5266101c2 100644
--- a/core/src/main/java/org/apache/iceberg/FileMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/FileMetadata.java
@@ -109,6 +109,11 @@ public class FileMetadata {
this.keyMetadata =
toCopy.keyMetadata() == null ? null :
ByteBuffers.copy(toCopy.keyMetadata());
this.sortOrderId = toCopy.sortOrderId();
+ this.splitOffsets = toCopy.splitOffsets();
+ // Preserve DV-specific fields for deletion vectors
+ this.referencedDataFile = toCopy.referencedDataFile();
+ this.contentOffset = toCopy.contentOffset();
+ this.contentSizeInBytes = toCopy.contentSizeInBytes();
return this;
}
diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
index ee7679f5e9..6f58b29315 100644
--- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
+++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -40,6 +41,11 @@ import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinCompressionCodec;
+import org.apache.iceberg.puffin.PuffinReader;
+import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -446,16 +452,8 @@ public class RewriteTablePathUtil {
switch (file.content()) {
case POSITION_DELETES:
- String targetDeleteFilePath = newPath(file.location(), sourcePrefix,
targetPrefix);
- Metrics metricsWithTargetPath =
- ContentFileUtil.replacePathBounds(file, sourcePrefix,
targetPrefix);
- DeleteFile movedFile =
- FileMetadata.deleteFileBuilder(spec)
- .copy(file)
- .withPath(targetDeleteFilePath)
- .withMetrics(metricsWithTargetPath)
- .build();
- appendEntryWithFile(entry, writer, movedFile);
+ DeleteFile posDeleteFile = newPositionDeleteEntry(file, spec,
sourcePrefix, targetPrefix);
+ appendEntryWithFile(entry, writer, posDeleteFile);
// keep the following entries in metadata but exclude them from
copyPlan
// 1) deleted position delete files
// 2) entries not changed by snapshotIds
@@ -465,7 +463,7 @@ public class RewriteTablePathUtil {
.add(
Pair.of(
stagingPath(file.location(), sourcePrefix,
stagingLocation),
- movedFile.location()));
+ posDeleteFile.location()));
}
result.toRewrite().add(file);
return result;
@@ -524,6 +522,56 @@ public class RewriteTablePathUtil {
.build();
}
+ private static DeleteFile newPositionDeleteEntry(
+ DeleteFile file, PartitionSpec spec, String sourcePrefix, String
targetPrefix) {
+ String path = file.location();
+ Preconditions.checkArgument(
+ path.startsWith(sourcePrefix),
+ "Expected delete file %s to start with prefix: %s",
+ path,
+ sourcePrefix);
+
+ FileMetadata.Builder builder =
+ FileMetadata.deleteFileBuilder(spec)
+ .copy(file)
+ .withPath(newPath(path, sourcePrefix, targetPrefix))
+ .withMetrics(ContentFileUtil.replacePathBounds(file, sourcePrefix,
targetPrefix));
+
+ // Update referencedDataFile for DV files
+ String newReferencedDataFile =
+ rewriteReferencedDataFilePathForDV(file, sourcePrefix, targetPrefix);
+ if (newReferencedDataFile != null) {
+ builder.withReferencedDataFile(newReferencedDataFile);
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Replace the referenced data file path for a DV (Deletion Vector) file.
+ *
+ * <p>For DV files, returns the updated path with the target prefix. For
non-DV files or files
+ * without a referenced data file, returns null.
+ *
+ * @param deleteFile delete file to check
+ * @param sourcePrefix source prefix that will be replaced
+ * @param targetPrefix target prefix that will replace it
+ * @return updated referenced data file path, or null if not applicable
+ */
+ private static String rewriteReferencedDataFilePathForDV(
+ DeleteFile deleteFile, String sourcePrefix, String targetPrefix) {
+ if (!ContentFileUtil.isDV(deleteFile) || deleteFile.referencedDataFile()
== null) {
+ return null;
+ }
+
+ String oldReferencedDataFile = deleteFile.referencedDataFile();
+ if (oldReferencedDataFile.startsWith(sourcePrefix)) {
+ return newPath(oldReferencedDataFile, sourcePrefix, targetPrefix);
+ }
+
+ return oldReferencedDataFile;
+ }
+
/** Class providing engine-specific methods to read and write position
delete files. */
public interface PositionDeleteReaderWriter extends Serializable {
CloseableIterable<Record> reader(InputFile inputFile, FileFormat format,
PartitionSpec spec);
@@ -562,6 +610,14 @@ public class RewriteTablePathUtil {
throw new UnsupportedOperationException(
String.format("Expected delete file %s to start with prefix: %s",
path, sourcePrefix));
}
+
+ // DV files (Puffin format for v3+) need special handling to rewrite
internal blob metadata
+ if (ContentFileUtil.isDV(deleteFile)) {
+ rewriteDVFile(deleteFile, outputFile, io, sourcePrefix, targetPrefix);
+ return;
+ }
+
+ // For non-DV position delete files (v2), rewrite using the reader/writer
InputFile sourceFile = io.newInputFile(path);
try (CloseableIterable<Record> reader =
posDeleteReaderWriter.reader(sourceFile, deleteFile.format(), spec)) {
@@ -592,6 +648,57 @@ public class RewriteTablePathUtil {
}
}
+ /**
+ * Rewrite a DV (Deletion Vector) file, updating the referenced data file
paths in blob metadata.
+ *
+ * @param deleteFile source DV file to be rewritten
+ * @param outputFile output file to write the rewritten DV to
+ * @param io file io
+ * @param sourcePrefix source prefix that will be replaced
+ * @param targetPrefix target prefix to replace it
+ */
+ private static void rewriteDVFile(
+ DeleteFile deleteFile,
+ OutputFile outputFile,
+ FileIO io,
+ String sourcePrefix,
+ String targetPrefix)
+ throws IOException {
+ List<Blob> rewrittenBlobs = Lists.newArrayList();
+ try (PuffinReader reader =
Puffin.read(io.newInputFile(deleteFile.location())).build()) {
+ // Read all blobs and rewrite them with updated referenced data file
paths
+ for (Pair<org.apache.iceberg.puffin.BlobMetadata, ByteBuffer> blobPair :
+ reader.readAll(reader.fileMetadata().blobs())) {
+ org.apache.iceberg.puffin.BlobMetadata blobMetadata = blobPair.first();
+ ByteBuffer blobData = blobPair.second();
+
+ // Get the original properties and update the referenced data file path
+ Map<String, String> properties =
Maps.newHashMap(blobMetadata.properties());
+ String referencedDataFile = properties.get("referenced-data-file");
+ if (referencedDataFile != null &&
referencedDataFile.startsWith(sourcePrefix)) {
+ String newReferencedDataFile = newPath(referencedDataFile,
sourcePrefix, targetPrefix);
+ properties.put("referenced-data-file", newReferencedDataFile);
+ }
+
+ // Create a new blob with updated properties
+ rewrittenBlobs.add(
+ new Blob(
+ blobMetadata.type(),
+ blobMetadata.inputFields(),
+ blobMetadata.snapshotId(),
+ blobMetadata.sequenceNumber(),
+ blobData,
+
PuffinCompressionCodec.forName(blobMetadata.compressionCodec()),
+ properties));
+ }
+ }
+
+ try (PuffinWriter writer =
+
Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build()) {
+ rewrittenBlobs.forEach(writer::write);
+ }
+ }
+
private static PositionDelete newPositionDeleteRecord(
Record record, String sourcePrefix, String targetPrefix) {
PositionDelete delete = PositionDelete.create();
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index 6dac5d5da0..0bcaf0af65 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -22,6 +22,7 @@ import static
org.apache.iceberg.spark.actions.RewriteTablePathSparkAction.NOT_A
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.File;
import java.io.IOException;
@@ -39,6 +40,9 @@ import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
@@ -59,6 +63,7 @@ import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -80,10 +85,12 @@ import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BroadcastBlockId;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import scala.Tuple2;
+@ExtendWith(ParameterizedTestExtension.class)
public class TestRewriteTablePathsAction extends TestBase {
@TempDir private Path staging;
@@ -91,6 +98,13 @@ public class TestRewriteTablePathsAction extends TestBase {
@TempDir private Path newTableDir;
@TempDir private Path targetTableDir;
+ @Parameters(name = "formatVersion = {0}")
+ protected static List<Integer> formatVersions() {
+ return TestHelpers.V2_AND_ABOVE;
+ }
+
+ @Parameter private int formatVersion;
+
protected ActionsProvider actions() {
return SparkActions.get();
}
@@ -135,7 +149,15 @@ public class TestRewriteTablePathsAction extends TestBase {
private Table createTableWithSnapshots(
String location, int snapshotNumber, Map<String, String> properties,
String mode) {
- Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(),
properties, location);
+ Table newTable =
+ TABLES.create(
+ SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.<String, String>builder()
+ .put(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion))
+ .putAll(properties)
+ .build(),
+ location);
List<ThreeColumnRecord> records =
Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
@@ -159,7 +181,7 @@ public class TestRewriteTablePathsAction extends TestBase {
sql("DROP DATABASE IF EXISTS %s CASCADE", backupNs);
}
- @Test
+ @TestTemplate
public void testRewritePath() throws Exception {
String targetTableLocation = targetTableLocation();
@@ -207,7 +229,7 @@ public class TestRewriteTablePathsAction extends TestBase {
assertEquals("Rows should match after copy", expected, actual);
}
- @Test
+ @TestTemplate
public void testSameLocations() {
assertThatThrownBy(
() ->
@@ -220,7 +242,7 @@ public class TestRewriteTablePathsAction extends TestBase {
.hasMessageContaining("Source prefix cannot be the same as target
prefix");
}
- @Test
+ @TestTemplate
public void testStartVersion() throws Exception {
RewriteTablePath.Result result =
actions()
@@ -244,7 +266,7 @@ public class TestRewriteTablePathsAction extends TestBase {
.isEmpty();
}
- @Test
+ @TestTemplate
public void testIncrementalRewrite() throws Exception {
String location = newTableLocation();
Table sourceTable =
@@ -291,7 +313,7 @@ public class TestRewriteTablePathsAction extends TestBase {
assertEquals("Rows should match after copy", expected, actual);
}
- @Test
+ @TestTemplate
public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path
location2)
throws Exception {
String location = newTableLocation();
@@ -316,7 +338,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(3, 3, 3, 12, result1);
}
- @Test
+ @TestTemplate
public void testFullTableRewritePath() throws Exception {
RewriteTablePath.Result result =
actions()
@@ -327,7 +349,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(3, 2, 2, 9, result);
}
- @Test
+ @TestTemplate
public void testManifestRewriteAndIncrementalCopy() throws Exception {
RewriteTablePath.Result initialResult =
actions()
@@ -354,7 +376,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(1, 1, addedManifest, 3, postReweiteResult);
}
- @Test
+ @TestTemplate
public void testDeleteDataFile() throws Exception {
List<String> validDataFiles =
spark
@@ -386,17 +408,17 @@ public class TestRewriteTablePathsAction extends TestBase
{
.hasSize(1);
}
- @Test
+ @TestTemplate
public void testPositionDeletesParquet() throws Exception {
runPositionDeletesTest("parquet");
}
- @Test
+ @TestTemplate
public void testPositionDeletesAvro() throws Exception {
runPositionDeletesTest("avro");
}
- @Test
+ @TestTemplate
public void testPositionDeletesOrc() throws Exception {
runPositionDeletesTest("orc");
}
@@ -427,7 +449,8 @@ public class TestRewriteTablePathsAction extends TestBase {
FileHelpers.writeDeleteFile(
tableWithPosDeletes,
tableWithPosDeletes.io().newOutputFile(file.toURI().toString()),
- deletes)
+ deletes,
+ formatVersion)
.first();
tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit();
@@ -454,7 +477,7 @@ public class TestRewriteTablePathsAction extends TestBase {
.hasSize(1);
}
- @Test
+ @TestTemplate
public void testPositionDeleteWithRow() throws Exception {
String dataFileLocation =
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location();
@@ -467,7 +490,8 @@ public class TestRewriteTablePathsAction extends TestBase {
.toURI()
.toString());
deletes.add(positionDelete(SCHEMA, dataFileLocation, 0L, 1, "AAAAAAAAAA",
"AAAA"));
- DeleteFile positionDeletes = FileHelpers.writePosDeleteFile(table,
deleteFile, null, deletes);
+ DeleteFile positionDeletes =
+ FileHelpers.writePosDeleteFile(table, deleteFile, null, deletes,
formatVersion);
table.newRowDelta().addDeletes(positionDeletes).commit();
assertThat(spark.read().format("iceberg").load(table.location()).collectAsList()).hasSize(1);
@@ -486,18 +510,24 @@ public class TestRewriteTablePathsAction extends TestBase
{
// copy the metadata files and data files
copyTableFiles(result);
- // check copied position delete row
- Object[] deletedRow = (Object[]) rows(targetTableLocation() +
"#position_deletes").get(0)[2];
- assertEquals(
- "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA",
"AAAA"}, deletedRow);
+ // check copied position delete row - only v2 stores row data with
position deletes
+ // v3+ uses Deletion Vectors (DV) which only store position information
+ if (formatVersion == 2) {
+ Object[] deletedRow = (Object[]) rows(targetTableLocation() +
"#position_deletes").get(0)[2];
+ assertEquals(
+ "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA",
"AAAA"}, deletedRow);
+ }
// Positional delete affects a single row, so only one row must remain
assertThat(spark.read().format("iceberg").load(targetTableLocation()).collectAsList())
.hasSize(1);
}
- @Test
+ @TestTemplate
public void testPositionDeletesAcrossFiles() throws Exception {
+ assumeThat(formatVersion)
+ .as("Can't write multiple deletes into a single v3 delete file")
+ .isEqualTo(2);
Stream<DataFile> allFiles =
StreamSupport.stream(table.snapshots().spliterator(), false)
.flatMap(s ->
StreamSupport.stream(s.addedDataFiles(table.io()).spliterator(), false));
@@ -510,7 +540,7 @@ public class TestRewriteTablePathsAction extends TestBase {
File file = new File(removePrefix(table.location() +
"/data/deeply/nested/file.parquet"));
DeleteFile positionDeletes =
FileHelpers.writeDeleteFile(
- table, table.io().newOutputFile(file.toURI().toString()),
deletes)
+ table, table.io().newOutputFile(file.toURI().toString()),
deletes, formatVersion)
.first();
table.newRowDelta().addDeletes(positionDeletes).commit();
@@ -535,7 +565,7 @@ public class TestRewriteTablePathsAction extends TestBase {
.isEmpty();
}
- @Test
+ @TestTemplate
public void testEqualityDeletes() throws Exception {
Table sourceTable = createTableWithSnapshots(newTableLocation(), 1);
@@ -590,7 +620,7 @@ public class TestRewriteTablePathsAction extends TestBase {
.hasSize(2);
}
- @Test
+ @TestTemplate
public void testFullTableRewritePathWithDeletedVersionFiles() throws
Exception {
String location = newTableLocation();
Table sourceTable = createTableWithSnapshots(location, 2);
@@ -636,7 +666,7 @@ public class TestRewriteTablePathsAction extends TestBase {
result);
}
- @Test
+ @TestTemplate
public void testRewritePathWithoutSnapshot() throws Exception {
RewriteTablePath.Result result =
actions()
@@ -649,7 +679,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(1, 0, 0, 1, result);
}
- @Test
+ @TestTemplate
public void testExpireSnapshotBeforeRewrite() throws Exception {
// expire one snapshot
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -664,7 +694,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(4, 1, 2, 9, result);
}
- @Test
+ @TestTemplate
public void testRewritePathWithNonLiveEntry() throws Exception {
String location = newTableLocation();
// first overwrite generate 1 manifest and 1 data file
@@ -729,7 +759,7 @@ public class TestRewriteTablePathsAction extends TestBase {
assertThat(copiedEntries).contains(deletedDataFilePathInTargetLocation);
}
- @Test
+ @TestTemplate
public void testStartSnapshotWithoutValidSnapshot() throws Exception {
// expire one snapshot
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -748,7 +778,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(2, 1, 1, 5, result);
}
- @Test
+ @TestTemplate
public void testMoveTheVersionExpireSnapshot() throws Exception {
// expire one snapshot
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -766,7 +796,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(1, 0, 0, 1, result);
}
- @Test
+ @TestTemplate
public void testMoveVersionWithInvalidSnapshots() {
// expire one snapshot
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -785,7 +815,7 @@ public class TestRewriteTablePathsAction extends TestBase {
+ "Please choose an earlier version without invalid
snapshots.");
}
- @Test
+ @TestTemplate
public void testRollBack() throws Exception {
long secondSnapshotId = table.currentSnapshot().snapshotId();
@@ -812,7 +842,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(6, 3, 3, 15, result);
}
- @Test
+ @TestTemplate
public void testWriteAuditPublish() throws Exception {
// enable WAP
table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED,
"true").commit();
@@ -837,7 +867,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(5, 3, 3, 14, result);
}
- @Test
+ @TestTemplate
public void testSchemaChange() throws Exception {
// change the schema
table.updateSchema().addColumn("c4", Types.StringType.get()).commit();
@@ -854,7 +884,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(4, 2, 2, 10, result);
}
- @Test
+ @TestTemplate
public void testSnapshotIdInheritanceEnabled() throws Exception {
String sourceTableLocation = newTableLocation();
Map<String, String> properties = Maps.newHashMap();
@@ -872,7 +902,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(3, 2, 2, 9, result);
}
- @Test
+ @TestTemplate
public void testMetadataCompression() throws Exception {
String sourceTableLocation = newTableLocation();
Map<String, String> properties = Maps.newHashMap();
@@ -898,7 +928,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(2, 2, 2, 8, result);
}
- @Test
+ @TestTemplate
public void testInvalidArgs() {
RewriteTablePath actions = actions().rewriteTablePath(table);
@@ -931,12 +961,12 @@ public class TestRewriteTablePathsAction extends TestBase
{
.hasMessageContaining("End version('null') cannot be empty");
}
- @Test
+ @TestTemplate
public void testTableWithManyPartitionStatisticFile() throws IOException {
String sourceTableLocation = newTableLocation();
- Map<String, String> properties = Maps.newHashMap();
- properties.put("format-version", "2");
- String tableName = "v2tblwithPartStats";
+ Map<String, String> properties =
+ ImmutableMap.of(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion));
+ String tableName = String.format("v%stblwithPartStats", formatVersion);
Table sourceTable =
createMetastoreTable(sourceTableLocation, properties, "default",
tableName, 0, "c1");
@@ -963,12 +993,12 @@ public class TestRewriteTablePathsAction extends TestBase
{
result, "partition-stats", sourceTableLocation, targetTableLocation);
}
- @Test
+ @TestTemplate
public void testTableWithManyStatisticFiles() throws IOException {
String sourceTableLocation = newTableLocation();
- Map<String, String> properties = Maps.newHashMap();
- properties.put("format-version", "2");
- String tableName = "v2tblwithmanystats";
+ Map<String, String> properties =
+ ImmutableMap.of(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion));
+ String tableName = String.format("v%stblwithmanystats", formatVersion);
Table sourceTable =
createMetastoreTable(sourceTableLocation, properties, "default",
tableName, 0);
@@ -992,12 +1022,12 @@ public class TestRewriteTablePathsAction extends
TestBase {
iterations * 2 + 1, iterations, iterations, iterations, iterations * 6
+ 1, result);
}
- @Test
+ @TestTemplate
public void testStatisticsFileSourcePath() throws IOException {
String sourceTableLocation = newTableLocation();
- Map<String, String> properties = Maps.newHashMap();
- properties.put("format-version", "2");
- String tableName = "v2tblwithstats";
+ Map<String, String> properties =
+ ImmutableMap.of(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion));
+ String tableName = String.format("v%stblwithstats", formatVersion);
Table sourceTable =
createMetastoreTable(sourceTableLocation, properties, "default",
tableName, 1);
@@ -1020,13 +1050,17 @@ public class TestRewriteTablePathsAction extends
TestBase {
findAndAssertFileInFileList(result, ".stats", sourceTableLocation,
targetTableLocation);
}
- @Test
+ @TestTemplate
public void testMetadataCompressionWithMetastoreTable() throws Exception {
Map<String, String> properties = Maps.newHashMap();
properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
Table sourceTable =
createMetastoreTable(
- newTableLocation(), properties, "default",
"testMetadataCompression", 2);
+ newTableLocation(),
+ properties,
+ "default",
+ String.format("v%sMetadataCompression", formatVersion),
+ 2);
TableMetadata currentMetadata = currentMetadata(sourceTable);
@@ -1054,10 +1088,11 @@ public class TestRewriteTablePathsAction extends
TestBase {
}
// Metastore table tests
- @Test
+ @TestTemplate
public void testMetadataLocationChange() throws Exception {
+ String tableName = String.format("v%stblWithLocation", formatVersion);
Table sourceTable =
- createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default",
"tbl", 1);
+ createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default",
tableName, 1);
String metadataFilePath =
currentMetadata(sourceTable).metadataFileLocation();
String newMetadataDir = "new-metadata-dir";
@@ -1066,7 +1101,7 @@ public class TestRewriteTablePathsAction extends TestBase
{
.set(TableProperties.WRITE_METADATA_LOCATION, newTableLocation() +
newMetadataDir)
.commit();
- spark.sql("insert into hive.default.tbl values (1, 'AAAAAAAAAA', 'AAAA')");
+ sql("insert into hive.default.%s values (1, 'AAAAAAAAAA', 'AAAA')",
tableName);
sourceTable.refresh();
// copy table
@@ -1099,12 +1134,15 @@ public class TestRewriteTablePathsAction extends
TestBase {
checkFileNum(2, 1, 1, 5, result2);
}
- @Test
+ @TestTemplate
public void testDeleteFrom() throws Exception {
- Map<String, String> properties = Maps.newHashMap();
- properties.put("format-version", "2");
- properties.put("write.delete.mode", "merge-on-read");
- String tableName = "v2tbl";
+ Map<String, String> properties =
+ ImmutableMap.of(
+ TableProperties.FORMAT_VERSION,
+ String.valueOf(formatVersion),
+ "write.delete.mode",
+ "merge-on-read");
+ String tableName = String.format("v%stbl", formatVersion);
Table sourceTable =
createMetastoreTable(newTableLocation(), properties, "default",
tableName, 0);
// ingest data
@@ -1152,7 +1190,7 @@ public class TestRewriteTablePathsAction extends TestBase
{
// register table
String metadataLocation =
currentMetadata(sourceTable).metadataFileLocation();
String versionFile = fileName(metadataLocation);
- String targetTableName = "copiedV2Table";
+ String targetTableName = String.format("copiedV%sTable", formatVersion);
TableIdentifier tableIdentifier = TableIdentifier.of("default",
targetTableName);
catalog.registerTable(tableIdentifier, targetTableLocation() +
"/metadata/" + versionFile);
@@ -1168,7 +1206,7 @@ public class TestRewriteTablePathsAction extends TestBase
{
assertEquals("Rows must match", originalData, copiedData);
}
- @Test
+ @TestTemplate
public void testKryoDeserializeBroadcastValues() {
sparkContext.getConf().set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
RewriteTablePathSparkAction action =
@@ -1179,8 +1217,11 @@ public class TestRewriteTablePathsAction extends
TestBase {
assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid());
}
- @Test
+ @TestTemplate
public void testNestedDirectoryStructurePreservation() throws Exception {
+ assumeThat(formatVersion)
+ .as("Can't add multiple DVs for the same data file in v3")
+ .isEqualTo(2);
String sourceTableLocation = newTableLocation();
Table sourceTable = createTableWithSnapshots(sourceTableLocation, 1);
@@ -1217,12 +1258,18 @@ public class TestRewriteTablePathsAction extends
TestBase {
DeleteFile positionDeletes1 =
FileHelpers.writeDeleteFile(
- sourceTable,
sourceTable.io().newOutputFile(file1.toURI().toString()), deletes1)
+ sourceTable,
+ sourceTable.io().newOutputFile(file1.toURI().toString()),
+ deletes1,
+ formatVersion)
.first();
DeleteFile positionDeletes2 =
FileHelpers.writeDeleteFile(
- sourceTable,
sourceTable.io().newOutputFile(file2.toURI().toString()), deletes2)
+ sourceTable,
+ sourceTable.io().newOutputFile(file2.toURI().toString()),
+ deletes2,
+ formatVersion)
.first();
sourceTable.newRowDelta().addDeletes(positionDeletes1).commit();
@@ -1266,7 +1313,7 @@ public class TestRewriteTablePathsAction extends TestBase
{
assertThat(targetPath2).startsWith(targetTableLocation());
}
- @Test
+ @TestTemplate
public void testRewritePathWithoutCreateFileList() throws Exception {
String targetTableLocation = targetTableLocation();
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index 6dac5d5da0..0bcaf0af65 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -22,6 +22,7 @@ import static
org.apache.iceberg.spark.actions.RewriteTablePathSparkAction.NOT_A
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.File;
import java.io.IOException;
@@ -39,6 +40,9 @@ import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
@@ -59,6 +63,7 @@ import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -80,10 +85,12 @@ import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BroadcastBlockId;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import scala.Tuple2;
+@ExtendWith(ParameterizedTestExtension.class)
public class TestRewriteTablePathsAction extends TestBase {
@TempDir private Path staging;
@@ -91,6 +98,13 @@ public class TestRewriteTablePathsAction extends TestBase {
@TempDir private Path newTableDir;
@TempDir private Path targetTableDir;
+ @Parameters(name = "formatVersion = {0}")
+ protected static List<Integer> formatVersions() {
+ return TestHelpers.V2_AND_ABOVE;
+ }
+
+ @Parameter private int formatVersion;
+
protected ActionsProvider actions() {
return SparkActions.get();
}
@@ -135,7 +149,15 @@ public class TestRewriteTablePathsAction extends TestBase {
private Table createTableWithSnapshots(
String location, int snapshotNumber, Map<String, String> properties,
String mode) {
- Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(),
properties, location);
+ Table newTable =
+ TABLES.create(
+ SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.<String, String>builder()
+ .put(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion))
+ .putAll(properties)
+ .build(),
+ location);
List<ThreeColumnRecord> records =
Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
@@ -159,7 +181,7 @@ public class TestRewriteTablePathsAction extends TestBase {
sql("DROP DATABASE IF EXISTS %s CASCADE", backupNs);
}
- @Test
+ @TestTemplate
public void testRewritePath() throws Exception {
String targetTableLocation = targetTableLocation();
@@ -207,7 +229,7 @@ public class TestRewriteTablePathsAction extends TestBase {
assertEquals("Rows should match after copy", expected, actual);
}
- @Test
+ @TestTemplate
public void testSameLocations() {
assertThatThrownBy(
() ->
@@ -220,7 +242,7 @@ public class TestRewriteTablePathsAction extends TestBase {
.hasMessageContaining("Source prefix cannot be the same as target
prefix");
}
- @Test
+ @TestTemplate
public void testStartVersion() throws Exception {
RewriteTablePath.Result result =
actions()
@@ -244,7 +266,7 @@ public class TestRewriteTablePathsAction extends TestBase {
.isEmpty();
}
- @Test
+ @TestTemplate
public void testIncrementalRewrite() throws Exception {
String location = newTableLocation();
Table sourceTable =
@@ -291,7 +313,7 @@ public class TestRewriteTablePathsAction extends TestBase {
assertEquals("Rows should match after copy", expected, actual);
}
- @Test
+ @TestTemplate
public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path
location2)
throws Exception {
String location = newTableLocation();
@@ -316,7 +338,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(3, 3, 3, 12, result1);
}
- @Test
+ @TestTemplate
public void testFullTableRewritePath() throws Exception {
RewriteTablePath.Result result =
actions()
@@ -327,7 +349,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(3, 2, 2, 9, result);
}
- @Test
+ @TestTemplate
public void testManifestRewriteAndIncrementalCopy() throws Exception {
RewriteTablePath.Result initialResult =
actions()
@@ -354,7 +376,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(1, 1, addedManifest, 3, postReweiteResult);
}
- @Test
+ @TestTemplate
public void testDeleteDataFile() throws Exception {
List<String> validDataFiles =
spark
@@ -386,17 +408,17 @@ public class TestRewriteTablePathsAction extends TestBase
{
.hasSize(1);
}
- @Test
+ @TestTemplate
public void testPositionDeletesParquet() throws Exception {
runPositionDeletesTest("parquet");
}
- @Test
+ @TestTemplate
public void testPositionDeletesAvro() throws Exception {
runPositionDeletesTest("avro");
}
- @Test
+ @TestTemplate
public void testPositionDeletesOrc() throws Exception {
runPositionDeletesTest("orc");
}
@@ -427,7 +449,8 @@ public class TestRewriteTablePathsAction extends TestBase {
FileHelpers.writeDeleteFile(
tableWithPosDeletes,
tableWithPosDeletes.io().newOutputFile(file.toURI().toString()),
- deletes)
+ deletes,
+ formatVersion)
.first();
tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit();
@@ -454,7 +477,7 @@ public class TestRewriteTablePathsAction extends TestBase {
.hasSize(1);
}
- @Test
+ @TestTemplate
public void testPositionDeleteWithRow() throws Exception {
String dataFileLocation =
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location();
@@ -467,7 +490,8 @@ public class TestRewriteTablePathsAction extends TestBase {
.toURI()
.toString());
deletes.add(positionDelete(SCHEMA, dataFileLocation, 0L, 1, "AAAAAAAAAA",
"AAAA"));
- DeleteFile positionDeletes = FileHelpers.writePosDeleteFile(table,
deleteFile, null, deletes);
+ DeleteFile positionDeletes =
+ FileHelpers.writePosDeleteFile(table, deleteFile, null, deletes,
formatVersion);
table.newRowDelta().addDeletes(positionDeletes).commit();
assertThat(spark.read().format("iceberg").load(table.location()).collectAsList()).hasSize(1);
@@ -486,18 +510,24 @@ public class TestRewriteTablePathsAction extends TestBase
{
// copy the metadata files and data files
copyTableFiles(result);
- // check copied position delete row
- Object[] deletedRow = (Object[]) rows(targetTableLocation() +
"#position_deletes").get(0)[2];
- assertEquals(
- "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA",
"AAAA"}, deletedRow);
+ // check copied position delete row - only v2 stores row data with
position deletes
+ // v3+ uses Deletion Vectors (DV) which only store position information
+ if (formatVersion == 2) {
+ Object[] deletedRow = (Object[]) rows(targetTableLocation() +
"#position_deletes").get(0)[2];
+ assertEquals(
+ "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA",
"AAAA"}, deletedRow);
+ }
// Positional delete affects a single row, so only one row must remain
assertThat(spark.read().format("iceberg").load(targetTableLocation()).collectAsList())
.hasSize(1);
}
- @Test
+ @TestTemplate
public void testPositionDeletesAcrossFiles() throws Exception {
+ assumeThat(formatVersion)
+ .as("Can't write multiple deletes into a single v3 delete file")
+ .isEqualTo(2);
Stream<DataFile> allFiles =
StreamSupport.stream(table.snapshots().spliterator(), false)
.flatMap(s ->
StreamSupport.stream(s.addedDataFiles(table.io()).spliterator(), false));
@@ -510,7 +540,7 @@ public class TestRewriteTablePathsAction extends TestBase {
File file = new File(removePrefix(table.location() +
"/data/deeply/nested/file.parquet"));
DeleteFile positionDeletes =
FileHelpers.writeDeleteFile(
- table, table.io().newOutputFile(file.toURI().toString()),
deletes)
+ table, table.io().newOutputFile(file.toURI().toString()),
deletes, formatVersion)
.first();
table.newRowDelta().addDeletes(positionDeletes).commit();
@@ -535,7 +565,7 @@ public class TestRewriteTablePathsAction extends TestBase {
.isEmpty();
}
- @Test
+ @TestTemplate
public void testEqualityDeletes() throws Exception {
Table sourceTable = createTableWithSnapshots(newTableLocation(), 1);
@@ -590,7 +620,7 @@ public class TestRewriteTablePathsAction extends TestBase {
.hasSize(2);
}
- @Test
+ @TestTemplate
public void testFullTableRewritePathWithDeletedVersionFiles() throws
Exception {
String location = newTableLocation();
Table sourceTable = createTableWithSnapshots(location, 2);
@@ -636,7 +666,7 @@ public class TestRewriteTablePathsAction extends TestBase {
result);
}
- @Test
+ @TestTemplate
public void testRewritePathWithoutSnapshot() throws Exception {
RewriteTablePath.Result result =
actions()
@@ -649,7 +679,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(1, 0, 0, 1, result);
}
- @Test
+ @TestTemplate
public void testExpireSnapshotBeforeRewrite() throws Exception {
// expire one snapshot
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -664,7 +694,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(4, 1, 2, 9, result);
}
- @Test
+ @TestTemplate
public void testRewritePathWithNonLiveEntry() throws Exception {
String location = newTableLocation();
// first overwrite generate 1 manifest and 1 data file
@@ -729,7 +759,7 @@ public class TestRewriteTablePathsAction extends TestBase {
assertThat(copiedEntries).contains(deletedDataFilePathInTargetLocation);
}
- @Test
+ @TestTemplate
public void testStartSnapshotWithoutValidSnapshot() throws Exception {
// expire one snapshot
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -748,7 +778,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(2, 1, 1, 5, result);
}
- @Test
+ @TestTemplate
public void testMoveTheVersionExpireSnapshot() throws Exception {
// expire one snapshot
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -766,7 +796,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(1, 0, 0, 1, result);
}
- @Test
+ @TestTemplate
public void testMoveVersionWithInvalidSnapshots() {
// expire one snapshot
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -785,7 +815,7 @@ public class TestRewriteTablePathsAction extends TestBase {
+ "Please choose an earlier version without invalid
snapshots.");
}
- @Test
+ @TestTemplate
public void testRollBack() throws Exception {
long secondSnapshotId = table.currentSnapshot().snapshotId();
@@ -812,7 +842,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(6, 3, 3, 15, result);
}
- @Test
+ @TestTemplate
public void testWriteAuditPublish() throws Exception {
// enable WAP
table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED,
"true").commit();
@@ -837,7 +867,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(5, 3, 3, 14, result);
}
- @Test
+ @TestTemplate
public void testSchemaChange() throws Exception {
// change the schema
table.updateSchema().addColumn("c4", Types.StringType.get()).commit();
@@ -854,7 +884,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(4, 2, 2, 10, result);
}
- @Test
+ @TestTemplate
public void testSnapshotIdInheritanceEnabled() throws Exception {
String sourceTableLocation = newTableLocation();
Map<String, String> properties = Maps.newHashMap();
@@ -872,7 +902,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(3, 2, 2, 9, result);
}
- @Test
+ @TestTemplate
public void testMetadataCompression() throws Exception {
String sourceTableLocation = newTableLocation();
Map<String, String> properties = Maps.newHashMap();
@@ -898,7 +928,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(2, 2, 2, 8, result);
}
- @Test
+ @TestTemplate
public void testInvalidArgs() {
RewriteTablePath actions = actions().rewriteTablePath(table);
@@ -931,12 +961,12 @@ public class TestRewriteTablePathsAction extends TestBase
{
.hasMessageContaining("End version('null') cannot be empty");
}
- @Test
+ @TestTemplate
public void testTableWithManyPartitionStatisticFile() throws IOException {
String sourceTableLocation = newTableLocation();
- Map<String, String> properties = Maps.newHashMap();
- properties.put("format-version", "2");
- String tableName = "v2tblwithPartStats";
+ Map<String, String> properties =
+ ImmutableMap.of(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion));
+ String tableName = String.format("v%stblwithPartStats", formatVersion);
Table sourceTable =
createMetastoreTable(sourceTableLocation, properties, "default",
tableName, 0, "c1");
@@ -963,12 +993,12 @@ public class TestRewriteTablePathsAction extends TestBase
{
result, "partition-stats", sourceTableLocation, targetTableLocation);
}
- @Test
+ @TestTemplate
public void testTableWithManyStatisticFiles() throws IOException {
String sourceTableLocation = newTableLocation();
- Map<String, String> properties = Maps.newHashMap();
- properties.put("format-version", "2");
- String tableName = "v2tblwithmanystats";
+ Map<String, String> properties =
+ ImmutableMap.of(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion));
+ String tableName = String.format("v%stblwithmanystats", formatVersion);
Table sourceTable =
createMetastoreTable(sourceTableLocation, properties, "default",
tableName, 0);
@@ -992,12 +1022,12 @@ public class TestRewriteTablePathsAction extends
TestBase {
iterations * 2 + 1, iterations, iterations, iterations, iterations * 6
+ 1, result);
}
- @Test
+ @TestTemplate
public void testStatisticsFileSourcePath() throws IOException {
String sourceTableLocation = newTableLocation();
- Map<String, String> properties = Maps.newHashMap();
- properties.put("format-version", "2");
- String tableName = "v2tblwithstats";
+ Map<String, String> properties =
+ ImmutableMap.of(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion));
+ String tableName = String.format("v%stblwithstats", formatVersion);
Table sourceTable =
createMetastoreTable(sourceTableLocation, properties, "default",
tableName, 1);
@@ -1020,13 +1050,17 @@ public class TestRewriteTablePathsAction extends
TestBase {
findAndAssertFileInFileList(result, ".stats", sourceTableLocation,
targetTableLocation);
}
- @Test
+ @TestTemplate
public void testMetadataCompressionWithMetastoreTable() throws Exception {
Map<String, String> properties = Maps.newHashMap();
properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
Table sourceTable =
createMetastoreTable(
- newTableLocation(), properties, "default",
"testMetadataCompression", 2);
+ newTableLocation(),
+ properties,
+ "default",
+ String.format("v%sMetadataCompression", formatVersion),
+ 2);
TableMetadata currentMetadata = currentMetadata(sourceTable);
@@ -1054,10 +1088,11 @@ public class TestRewriteTablePathsAction extends
TestBase {
}
// Metastore table tests
- @Test
+ @TestTemplate
public void testMetadataLocationChange() throws Exception {
+ String tableName = String.format("v%stblWithLocation", formatVersion);
Table sourceTable =
- createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default",
"tbl", 1);
+ createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default",
tableName, 1);
String metadataFilePath =
currentMetadata(sourceTable).metadataFileLocation();
String newMetadataDir = "new-metadata-dir";
@@ -1066,7 +1101,7 @@ public class TestRewriteTablePathsAction extends TestBase
{
.set(TableProperties.WRITE_METADATA_LOCATION, newTableLocation() +
newMetadataDir)
.commit();
- spark.sql("insert into hive.default.tbl values (1, 'AAAAAAAAAA', 'AAAA')");
+ sql("insert into hive.default.%s values (1, 'AAAAAAAAAA', 'AAAA')",
tableName);
sourceTable.refresh();
// copy table
@@ -1099,12 +1134,15 @@ public class TestRewriteTablePathsAction extends
TestBase {
checkFileNum(2, 1, 1, 5, result2);
}
- @Test
+ @TestTemplate
public void testDeleteFrom() throws Exception {
- Map<String, String> properties = Maps.newHashMap();
- properties.put("format-version", "2");
- properties.put("write.delete.mode", "merge-on-read");
- String tableName = "v2tbl";
+ Map<String, String> properties =
+ ImmutableMap.of(
+ TableProperties.FORMAT_VERSION,
+ String.valueOf(formatVersion),
+ "write.delete.mode",
+ "merge-on-read");
+ String tableName = String.format("v%stbl", formatVersion);
Table sourceTable =
createMetastoreTable(newTableLocation(), properties, "default",
tableName, 0);
// ingest data
@@ -1152,7 +1190,7 @@ public class TestRewriteTablePathsAction extends TestBase
{
// register table
String metadataLocation =
currentMetadata(sourceTable).metadataFileLocation();
String versionFile = fileName(metadataLocation);
- String targetTableName = "copiedV2Table";
+ String targetTableName = String.format("copiedV%sTable", formatVersion);
TableIdentifier tableIdentifier = TableIdentifier.of("default",
targetTableName);
catalog.registerTable(tableIdentifier, targetTableLocation() +
"/metadata/" + versionFile);
@@ -1168,7 +1206,7 @@ public class TestRewriteTablePathsAction extends TestBase
{
assertEquals("Rows must match", originalData, copiedData);
}
- @Test
+ @TestTemplate
public void testKryoDeserializeBroadcastValues() {
sparkContext.getConf().set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
RewriteTablePathSparkAction action =
@@ -1179,8 +1217,11 @@ public class TestRewriteTablePathsAction extends
TestBase {
assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid());
}
- @Test
+ @TestTemplate
public void testNestedDirectoryStructurePreservation() throws Exception {
+ assumeThat(formatVersion)
+ .as("Can't add multiple DVs for the same data file in v3")
+ .isEqualTo(2);
String sourceTableLocation = newTableLocation();
Table sourceTable = createTableWithSnapshots(sourceTableLocation, 1);
@@ -1217,12 +1258,18 @@ public class TestRewriteTablePathsAction extends
TestBase {
DeleteFile positionDeletes1 =
FileHelpers.writeDeleteFile(
- sourceTable,
sourceTable.io().newOutputFile(file1.toURI().toString()), deletes1)
+ sourceTable,
+ sourceTable.io().newOutputFile(file1.toURI().toString()),
+ deletes1,
+ formatVersion)
.first();
DeleteFile positionDeletes2 =
FileHelpers.writeDeleteFile(
- sourceTable,
sourceTable.io().newOutputFile(file2.toURI().toString()), deletes2)
+ sourceTable,
+ sourceTable.io().newOutputFile(file2.toURI().toString()),
+ deletes2,
+ formatVersion)
.first();
sourceTable.newRowDelta().addDeletes(positionDeletes1).commit();
@@ -1266,7 +1313,7 @@ public class TestRewriteTablePathsAction extends TestBase
{
assertThat(targetPath2).startsWith(targetTableLocation());
}
- @Test
+ @TestTemplate
public void testRewritePathWithoutCreateFileList() throws Exception {
String targetTableLocation = targetTableLocation();
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index 6dac5d5da0..0bcaf0af65 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -22,6 +22,7 @@ import static
org.apache.iceberg.spark.actions.RewriteTablePathSparkAction.NOT_A
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.File;
import java.io.IOException;
@@ -39,6 +40,9 @@ import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
@@ -59,6 +63,7 @@ import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -80,10 +85,12 @@ import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BroadcastBlockId;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import scala.Tuple2;
+@ExtendWith(ParameterizedTestExtension.class)
public class TestRewriteTablePathsAction extends TestBase {
@TempDir private Path staging;
@@ -91,6 +98,13 @@ public class TestRewriteTablePathsAction extends TestBase {
@TempDir private Path newTableDir;
@TempDir private Path targetTableDir;
+ @Parameters(name = "formatVersion = {0}")
+ protected static List<Integer> formatVersions() {
+ return TestHelpers.V2_AND_ABOVE;
+ }
+
+ @Parameter private int formatVersion;
+
protected ActionsProvider actions() {
return SparkActions.get();
}
@@ -135,7 +149,15 @@ public class TestRewriteTablePathsAction extends TestBase {
private Table createTableWithSnapshots(
String location, int snapshotNumber, Map<String, String> properties,
String mode) {
- Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(),
properties, location);
+ Table newTable =
+ TABLES.create(
+ SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.<String, String>builder()
+ .put(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion))
+ .putAll(properties)
+ .build(),
+ location);
List<ThreeColumnRecord> records =
Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
@@ -159,7 +181,7 @@ public class TestRewriteTablePathsAction extends TestBase {
sql("DROP DATABASE IF EXISTS %s CASCADE", backupNs);
}
- @Test
+ @TestTemplate
public void testRewritePath() throws Exception {
String targetTableLocation = targetTableLocation();
@@ -207,7 +229,7 @@ public class TestRewriteTablePathsAction extends TestBase {
assertEquals("Rows should match after copy", expected, actual);
}
- @Test
+ @TestTemplate
public void testSameLocations() {
assertThatThrownBy(
() ->
@@ -220,7 +242,7 @@ public class TestRewriteTablePathsAction extends TestBase {
.hasMessageContaining("Source prefix cannot be the same as target
prefix");
}
- @Test
+ @TestTemplate
public void testStartVersion() throws Exception {
RewriteTablePath.Result result =
actions()
@@ -244,7 +266,7 @@ public class TestRewriteTablePathsAction extends TestBase {
.isEmpty();
}
- @Test
+ @TestTemplate
public void testIncrementalRewrite() throws Exception {
String location = newTableLocation();
Table sourceTable =
@@ -291,7 +313,7 @@ public class TestRewriteTablePathsAction extends TestBase {
assertEquals("Rows should match after copy", expected, actual);
}
- @Test
+ @TestTemplate
public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path
location2)
throws Exception {
String location = newTableLocation();
@@ -316,7 +338,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(3, 3, 3, 12, result1);
}
- @Test
+ @TestTemplate
public void testFullTableRewritePath() throws Exception {
RewriteTablePath.Result result =
actions()
@@ -327,7 +349,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(3, 2, 2, 9, result);
}
- @Test
+ @TestTemplate
public void testManifestRewriteAndIncrementalCopy() throws Exception {
RewriteTablePath.Result initialResult =
actions()
@@ -354,7 +376,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(1, 1, addedManifest, 3, postReweiteResult);
}
- @Test
+ @TestTemplate
public void testDeleteDataFile() throws Exception {
List<String> validDataFiles =
spark
@@ -386,17 +408,17 @@ public class TestRewriteTablePathsAction extends TestBase
{
.hasSize(1);
}
- @Test
+ @TestTemplate
public void testPositionDeletesParquet() throws Exception {
runPositionDeletesTest("parquet");
}
- @Test
+ @TestTemplate
public void testPositionDeletesAvro() throws Exception {
runPositionDeletesTest("avro");
}
- @Test
+ @TestTemplate
public void testPositionDeletesOrc() throws Exception {
runPositionDeletesTest("orc");
}
@@ -427,7 +449,8 @@ public class TestRewriteTablePathsAction extends TestBase {
FileHelpers.writeDeleteFile(
tableWithPosDeletes,
tableWithPosDeletes.io().newOutputFile(file.toURI().toString()),
- deletes)
+ deletes,
+ formatVersion)
.first();
tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit();
@@ -454,7 +477,7 @@ public class TestRewriteTablePathsAction extends TestBase {
.hasSize(1);
}
- @Test
+ @TestTemplate
public void testPositionDeleteWithRow() throws Exception {
String dataFileLocation =
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location();
@@ -467,7 +490,8 @@ public class TestRewriteTablePathsAction extends TestBase {
.toURI()
.toString());
deletes.add(positionDelete(SCHEMA, dataFileLocation, 0L, 1, "AAAAAAAAAA",
"AAAA"));
- DeleteFile positionDeletes = FileHelpers.writePosDeleteFile(table,
deleteFile, null, deletes);
+ DeleteFile positionDeletes =
+ FileHelpers.writePosDeleteFile(table, deleteFile, null, deletes,
formatVersion);
table.newRowDelta().addDeletes(positionDeletes).commit();
assertThat(spark.read().format("iceberg").load(table.location()).collectAsList()).hasSize(1);
@@ -486,18 +510,24 @@ public class TestRewriteTablePathsAction extends TestBase
{
// copy the metadata files and data files
copyTableFiles(result);
- // check copied position delete row
- Object[] deletedRow = (Object[]) rows(targetTableLocation() +
"#position_deletes").get(0)[2];
- assertEquals(
- "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA",
"AAAA"}, deletedRow);
+ // check copied position delete row - only v2 stores row data with
position deletes
+ // v3+ uses Deletion Vectors (DV) which only store position information
+ if (formatVersion == 2) {
+ Object[] deletedRow = (Object[]) rows(targetTableLocation() +
"#position_deletes").get(0)[2];
+ assertEquals(
+ "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA",
"AAAA"}, deletedRow);
+ }
// Positional delete affects a single row, so only one row must remain
assertThat(spark.read().format("iceberg").load(targetTableLocation()).collectAsList())
.hasSize(1);
}
- @Test
+ @TestTemplate
public void testPositionDeletesAcrossFiles() throws Exception {
+ assumeThat(formatVersion)
+ .as("Can't write multiple deletes into a single v3 delete file")
+ .isEqualTo(2);
Stream<DataFile> allFiles =
StreamSupport.stream(table.snapshots().spliterator(), false)
.flatMap(s ->
StreamSupport.stream(s.addedDataFiles(table.io()).spliterator(), false));
@@ -510,7 +540,7 @@ public class TestRewriteTablePathsAction extends TestBase {
File file = new File(removePrefix(table.location() +
"/data/deeply/nested/file.parquet"));
DeleteFile positionDeletes =
FileHelpers.writeDeleteFile(
- table, table.io().newOutputFile(file.toURI().toString()),
deletes)
+ table, table.io().newOutputFile(file.toURI().toString()),
deletes, formatVersion)
.first();
table.newRowDelta().addDeletes(positionDeletes).commit();
@@ -535,7 +565,7 @@ public class TestRewriteTablePathsAction extends TestBase {
.isEmpty();
}
- @Test
+ @TestTemplate
public void testEqualityDeletes() throws Exception {
Table sourceTable = createTableWithSnapshots(newTableLocation(), 1);
@@ -590,7 +620,7 @@ public class TestRewriteTablePathsAction extends TestBase {
.hasSize(2);
}
- @Test
+ @TestTemplate
public void testFullTableRewritePathWithDeletedVersionFiles() throws
Exception {
String location = newTableLocation();
Table sourceTable = createTableWithSnapshots(location, 2);
@@ -636,7 +666,7 @@ public class TestRewriteTablePathsAction extends TestBase {
result);
}
- @Test
+ @TestTemplate
public void testRewritePathWithoutSnapshot() throws Exception {
RewriteTablePath.Result result =
actions()
@@ -649,7 +679,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(1, 0, 0, 1, result);
}
- @Test
+ @TestTemplate
public void testExpireSnapshotBeforeRewrite() throws Exception {
// expire one snapshot
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -664,7 +694,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(4, 1, 2, 9, result);
}
- @Test
+ @TestTemplate
public void testRewritePathWithNonLiveEntry() throws Exception {
String location = newTableLocation();
// first overwrite generate 1 manifest and 1 data file
@@ -729,7 +759,7 @@ public class TestRewriteTablePathsAction extends TestBase {
assertThat(copiedEntries).contains(deletedDataFilePathInTargetLocation);
}
- @Test
+ @TestTemplate
public void testStartSnapshotWithoutValidSnapshot() throws Exception {
// expire one snapshot
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -748,7 +778,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(2, 1, 1, 5, result);
}
- @Test
+ @TestTemplate
public void testMoveTheVersionExpireSnapshot() throws Exception {
// expire one snapshot
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -766,7 +796,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(1, 0, 0, 1, result);
}
- @Test
+ @TestTemplate
public void testMoveVersionWithInvalidSnapshots() {
// expire one snapshot
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
@@ -785,7 +815,7 @@ public class TestRewriteTablePathsAction extends TestBase {
+ "Please choose an earlier version without invalid
snapshots.");
}
- @Test
+ @TestTemplate
public void testRollBack() throws Exception {
long secondSnapshotId = table.currentSnapshot().snapshotId();
@@ -812,7 +842,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(6, 3, 3, 15, result);
}
- @Test
+ @TestTemplate
public void testWriteAuditPublish() throws Exception {
// enable WAP
table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED,
"true").commit();
@@ -837,7 +867,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(5, 3, 3, 14, result);
}
- @Test
+ @TestTemplate
public void testSchemaChange() throws Exception {
// change the schema
table.updateSchema().addColumn("c4", Types.StringType.get()).commit();
@@ -854,7 +884,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(4, 2, 2, 10, result);
}
- @Test
+ @TestTemplate
public void testSnapshotIdInheritanceEnabled() throws Exception {
String sourceTableLocation = newTableLocation();
Map<String, String> properties = Maps.newHashMap();
@@ -872,7 +902,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(3, 2, 2, 9, result);
}
- @Test
+ @TestTemplate
public void testMetadataCompression() throws Exception {
String sourceTableLocation = newTableLocation();
Map<String, String> properties = Maps.newHashMap();
@@ -898,7 +928,7 @@ public class TestRewriteTablePathsAction extends TestBase {
checkFileNum(2, 2, 2, 8, result);
}
- @Test
+ @TestTemplate
public void testInvalidArgs() {
RewriteTablePath actions = actions().rewriteTablePath(table);
@@ -931,12 +961,12 @@ public class TestRewriteTablePathsAction extends TestBase
{
.hasMessageContaining("End version('null') cannot be empty");
}
- @Test
+ @TestTemplate
public void testTableWithManyPartitionStatisticFile() throws IOException {
String sourceTableLocation = newTableLocation();
- Map<String, String> properties = Maps.newHashMap();
- properties.put("format-version", "2");
- String tableName = "v2tblwithPartStats";
+ Map<String, String> properties =
+ ImmutableMap.of(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion));
+ String tableName = String.format("v%stblwithPartStats", formatVersion);
Table sourceTable =
createMetastoreTable(sourceTableLocation, properties, "default",
tableName, 0, "c1");
@@ -963,12 +993,12 @@ public class TestRewriteTablePathsAction extends TestBase
{
result, "partition-stats", sourceTableLocation, targetTableLocation);
}
- @Test
+ @TestTemplate
public void testTableWithManyStatisticFiles() throws IOException {
String sourceTableLocation = newTableLocation();
- Map<String, String> properties = Maps.newHashMap();
- properties.put("format-version", "2");
- String tableName = "v2tblwithmanystats";
+ Map<String, String> properties =
+ ImmutableMap.of(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion));
+ String tableName = String.format("v%stblwithmanystats", formatVersion);
Table sourceTable =
createMetastoreTable(sourceTableLocation, properties, "default",
tableName, 0);
@@ -992,12 +1022,12 @@ public class TestRewriteTablePathsAction extends
TestBase {
iterations * 2 + 1, iterations, iterations, iterations, iterations * 6
+ 1, result);
}
- @Test
+ @TestTemplate
public void testStatisticsFileSourcePath() throws IOException {
String sourceTableLocation = newTableLocation();
- Map<String, String> properties = Maps.newHashMap();
- properties.put("format-version", "2");
- String tableName = "v2tblwithstats";
+ Map<String, String> properties =
+ ImmutableMap.of(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion));
+ String tableName = String.format("v%stblwithstats", formatVersion);
Table sourceTable =
createMetastoreTable(sourceTableLocation, properties, "default",
tableName, 1);
@@ -1020,13 +1050,17 @@ public class TestRewriteTablePathsAction extends
TestBase {
findAndAssertFileInFileList(result, ".stats", sourceTableLocation,
targetTableLocation);
}
- @Test
+ @TestTemplate
public void testMetadataCompressionWithMetastoreTable() throws Exception {
Map<String, String> properties = Maps.newHashMap();
properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
Table sourceTable =
createMetastoreTable(
- newTableLocation(), properties, "default",
"testMetadataCompression", 2);
+ newTableLocation(),
+ properties,
+ "default",
+ String.format("v%sMetadataCompression", formatVersion),
+ 2);
TableMetadata currentMetadata = currentMetadata(sourceTable);
@@ -1054,10 +1088,11 @@ public class TestRewriteTablePathsAction extends
TestBase {
}
// Metastore table tests
- @Test
+ @TestTemplate
public void testMetadataLocationChange() throws Exception {
+ String tableName = String.format("v%stblWithLocation", formatVersion);
Table sourceTable =
- createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default",
"tbl", 1);
+ createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default",
tableName, 1);
String metadataFilePath =
currentMetadata(sourceTable).metadataFileLocation();
String newMetadataDir = "new-metadata-dir";
@@ -1066,7 +1101,7 @@ public class TestRewriteTablePathsAction extends TestBase
{
.set(TableProperties.WRITE_METADATA_LOCATION, newTableLocation() +
newMetadataDir)
.commit();
- spark.sql("insert into hive.default.tbl values (1, 'AAAAAAAAAA', 'AAAA')");
+ sql("insert into hive.default.%s values (1, 'AAAAAAAAAA', 'AAAA')",
tableName);
sourceTable.refresh();
// copy table
@@ -1099,12 +1134,15 @@ public class TestRewriteTablePathsAction extends
TestBase {
checkFileNum(2, 1, 1, 5, result2);
}
- @Test
+ @TestTemplate
public void testDeleteFrom() throws Exception {
- Map<String, String> properties = Maps.newHashMap();
- properties.put("format-version", "2");
- properties.put("write.delete.mode", "merge-on-read");
- String tableName = "v2tbl";
+ Map<String, String> properties =
+ ImmutableMap.of(
+ TableProperties.FORMAT_VERSION,
+ String.valueOf(formatVersion),
+ "write.delete.mode",
+ "merge-on-read");
+ String tableName = String.format("v%stbl", formatVersion);
Table sourceTable =
createMetastoreTable(newTableLocation(), properties, "default",
tableName, 0);
// ingest data
@@ -1152,7 +1190,7 @@ public class TestRewriteTablePathsAction extends TestBase
{
// register table
String metadataLocation =
currentMetadata(sourceTable).metadataFileLocation();
String versionFile = fileName(metadataLocation);
- String targetTableName = "copiedV2Table";
+ String targetTableName = String.format("copiedV%sTable", formatVersion);
TableIdentifier tableIdentifier = TableIdentifier.of("default",
targetTableName);
catalog.registerTable(tableIdentifier, targetTableLocation() +
"/metadata/" + versionFile);
@@ -1168,7 +1206,7 @@ public class TestRewriteTablePathsAction extends TestBase
{
assertEquals("Rows must match", originalData, copiedData);
}
- @Test
+ @TestTemplate
public void testKryoDeserializeBroadcastValues() {
sparkContext.getConf().set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
RewriteTablePathSparkAction action =
@@ -1179,8 +1217,11 @@ public class TestRewriteTablePathsAction extends
TestBase {
assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid());
}
- @Test
+ @TestTemplate
public void testNestedDirectoryStructurePreservation() throws Exception {
+ assumeThat(formatVersion)
+ .as("Can't add multiple DVs for the same data file in v3")
+ .isEqualTo(2);
String sourceTableLocation = newTableLocation();
Table sourceTable = createTableWithSnapshots(sourceTableLocation, 1);
@@ -1217,12 +1258,18 @@ public class TestRewriteTablePathsAction extends
TestBase {
DeleteFile positionDeletes1 =
FileHelpers.writeDeleteFile(
- sourceTable,
sourceTable.io().newOutputFile(file1.toURI().toString()), deletes1)
+ sourceTable,
+ sourceTable.io().newOutputFile(file1.toURI().toString()),
+ deletes1,
+ formatVersion)
.first();
DeleteFile positionDeletes2 =
FileHelpers.writeDeleteFile(
- sourceTable,
sourceTable.io().newOutputFile(file2.toURI().toString()), deletes2)
+ sourceTable,
+ sourceTable.io().newOutputFile(file2.toURI().toString()),
+ deletes2,
+ formatVersion)
.first();
sourceTable.newRowDelta().addDeletes(positionDeletes1).commit();
@@ -1266,7 +1313,7 @@ public class TestRewriteTablePathsAction extends TestBase
{
assertThat(targetPath2).startsWith(targetTableLocation());
}
- @Test
+ @TestTemplate
public void testRewritePathWithoutCreateFileList() throws Exception {
String targetTableLocation = targetTableLocation();