This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.10.x by this push:
new 93cb59ace3 Spark 3.4: Pass format-version when creating a snapshot
(#14170) (#14171)
93cb59ace3 is described below
commit 93cb59ace3a299c41104aa5c7d311b8bfbff4e36
Author: Fokko Driesprong <[email protected]>
AuthorDate: Wed Sep 24 01:42:17 2025 +0200
Spark 3.4: Pass format-version when creating a snapshot (#14170) (#14171)
---
.../extensions/TestSnapshotTableProcedure.java | 94 ++++++++++++++++++++++
.../org/apache/iceberg/spark/SparkTableUtil.java | 37 ++++++---
2 files changed, 121 insertions(+), 10 deletions(-)
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
index 2ca8de50fa..d5d7dd1993 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
@@ -27,10 +27,18 @@ import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.avro.AvroIterable;
+import org.apache.iceberg.avro.GenericAvroReader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
import org.apache.spark.sql.AnalysisException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
@@ -263,6 +271,14 @@ public class TestSnapshotTableProcedure extends
ExtensionsTestBase {
.hasMessage("Parallelism should be larger than 0");
}
+ private static final Schema SNAPSHOT_ID_READ_SCHEMA =
+ new Schema(
+ Types.NestedField.required("snapshot_id")
+ .withId(1)
+ .ofType(Types.LongType.get())
+ .asOptional()
+ .build());
+
@TestTemplate
public void testSnapshotPartitionedWithParallelism() throws IOException {
String location = Files.createTempDirectory(temp,
"junit").toFile().toString();
@@ -282,4 +298,82 @@ public class TestSnapshotTableProcedure extends
ExtensionsTestBase {
ImmutableList.of(row("a", 1L), row("b", 2L)),
sql("SELECT * FROM %s ORDER BY id", tableName));
}
+
+ @TestTemplate
+ public void testSnapshotPartitioned() throws IOException {
+ String location = Files.createTempDirectory(temp,
"junit").toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
PARTITIONED BY (id) LOCATION '%s'",
+ SOURCE_NAME, location);
+ sql("INSERT INTO TABLE %s (id, data) VALUES (1, 'a'), (2, 'b')",
SOURCE_NAME);
+ assertThat(
+ sql(
+ "CALL %s.system.snapshot(source_table => '%s', table => '%s')",
+ catalogName, SOURCE_NAME, tableName))
+ .containsExactly(row(2L));
+ assertThat(sql("SELECT * FROM %s ORDER BY id", tableName))
+ .containsExactly(row("a", 1L), row("b", 2L));
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row("a", 1L), row("b", 2L)),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ Table createdTable = validationCatalog.loadTable(tableIdent);
+
+ for (ManifestFile manifest :
createdTable.currentSnapshot().dataManifests(createdTable.io())) {
+ try (AvroIterable<GenericData.Record> reader =
+ Avro.read(org.apache.iceberg.Files.localInput(manifest.path()))
+ .project(SNAPSHOT_ID_READ_SCHEMA)
+ .createResolvingReader(GenericAvroReader::create)
+ .build()) {
+
+ assertThat(reader.getMetadata()).containsEntry("format-version", "2");
+
+ List<GenericData.Record> records =
Lists.newArrayList(reader.iterator());
+ for (GenericData.Record row : records) {
+ assertThat(row.get(0)).as("Field-ID should be inherited").isNull();
+ }
+ }
+ }
+ }
+
+ @TestTemplate
+ public void testSnapshotPartitionedV1() throws IOException {
+ String location = Files.createTempDirectory(temp,
"junit").toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
PARTITIONED BY (id) LOCATION '%s'",
+ SOURCE_NAME, location);
+ sql("INSERT INTO TABLE %s (id, data) VALUES (1, 'a'), (2, 'b')",
SOURCE_NAME);
+ assertThat(
+ sql(
+ "CALL %s.system.snapshot(source_table => '%s', table => '%s',
properties => map('format-version', '1'))",
+ catalogName, SOURCE_NAME, tableName))
+ .containsExactly(row(2L));
+ assertThat(sql("SELECT * FROM %s ORDER BY id", tableName))
+ .containsExactly(row("a", 1L), row("b", 2L));
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row("a", 1L), row("b", 2L)),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ Table createdTable = validationCatalog.loadTable(tableIdent);
+
+ for (ManifestFile manifest :
createdTable.currentSnapshot().dataManifests(createdTable.io())) {
+ try (AvroIterable<GenericData.Record> reader =
+ Avro.read(org.apache.iceberg.Files.localInput(manifest.path()))
+ .project(SNAPSHOT_ID_READ_SCHEMA)
+ .createResolvingReader(GenericAvroReader::create)
+ .build()) {
+
+ assertThat(reader.getMetadata()).containsEntry("format-version", "1");
+
+ List<GenericData.Record> records =
Lists.newArrayList(reader.iterator());
+ for (GenericData.Record row : records) {
+ assertThat(row.get(0)).as("Field-ID should not be
inherited").isNotNull();
+ }
+ }
+ }
+ }
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index ae819020d8..ade2bd5b5c 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -362,6 +362,8 @@ public class SparkTableUtil {
}
private static Iterator<ManifestFile> buildManifest(
+ int formatVersion,
+ Long snapshotId,
SerializableConfiguration conf,
PartitionSpec spec,
String basePath,
@@ -379,7 +381,8 @@ public class SparkTableUtil {
Path location = new Path(basePath, suffix);
String outputPath = FileFormat.AVRO.addExtension(location.toString());
OutputFile outputFile = io.newOutputFile(outputPath);
- ManifestWriter<DataFile> writer = ManifestFiles.write(spec, outputFile);
+ ManifestWriter<DataFile> writer =
+ ManifestFiles.write(formatVersion, spec, outputFile, snapshotId);
try (ManifestWriter<DataFile> writerRef = writer) {
fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2));
@@ -867,6 +870,21 @@ public class SparkTableUtil {
DUPLICATE_FILE_MESSAGE, Joiner.on(",").join((String[])
duplicates.take(10))));
}
+ TableOperations ops = ((HasTableOperations) targetTable).operations();
+ int formatVersion = ops.current().formatVersion();
+ boolean snapshotIdInheritanceEnabled =
+ PropertyUtil.propertyAsBoolean(
+ targetTable.properties(),
+ TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
+ TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
+
+ final Long snapshotId;
+ if (formatVersion == 1 && !snapshotIdInheritanceEnabled) {
+ snapshotId = -1L;
+ } else {
+ snapshotId = null;
+ }
+
List<ManifestFile> manifests =
filesToImport
.repartition(numShufflePartitions)
@@ -877,19 +895,18 @@ public class SparkTableUtil {
.orderBy(col("_1"))
.mapPartitions(
(MapPartitionsFunction<Tuple2<String, DataFile>, ManifestFile>)
- fileTuple -> buildManifest(serializableConf, spec,
stagingDir, fileTuple),
+ fileTuple ->
+ buildManifest(
+ formatVersion,
+ snapshotId,
+ serializableConf,
+ spec,
+ stagingDir,
+ fileTuple),
Encoders.javaSerialization(ManifestFile.class))
.collectAsList();
try {
- TableOperations ops = ((HasTableOperations) targetTable).operations();
- int formatVersion = ops.current().formatVersion();
- boolean snapshotIdInheritanceEnabled =
- PropertyUtil.propertyAsBoolean(
- targetTable.properties(),
- TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
- TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
-
AppendFiles append = targetTable.newAppend();
manifests.forEach(append::appendManifest);
append.commit();