This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4a83228410 [iceberg] Enhance iceberg snapshot metadata. (#6354)
4a83228410 is described below
commit 4a83228410e6b3fd14042516b96c462a06f3fa98
Author: Jiajia Li <[email protected]>
AuthorDate: Tue Sep 30 09:43:13 2025 +0800
[iceberg] Enhance iceberg snapshot metadata. (#6354)
---
.../paimon/iceberg/IcebergCommitCallback.java | 10 +-
.../paimon/iceberg/metadata/IcebergMetadata.java | 2 +
.../paimon/iceberg/metadata/IcebergSnapshot.java | 59 ++-
.../iceberg/IcebergRestMetadataCommitter.java | 5 +-
.../apache/paimon/iceberg/IcebergMetadataTest.java | 554 +++++++++++++++++++++
.../iceberg/IcebergRestMetadataCommitterTest.java | 65 +++
6 files changed, 689 insertions(+), 6 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index c8a3717025..e8f85c662f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -339,10 +339,13 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
new IcebergSnapshot(
snapshotId,
snapshotId,
+ null,
System.currentTimeMillis(),
IcebergSnapshotSummary.APPEND,
pathFactory.toManifestListPath(manifestListFileName).toString(),
- schemaId);
+ schemaId,
+ null,
+ null);
// Tags can only be included in Iceberg if they point to an Iceberg
snapshot that
// exists. Otherwise an Iceberg client fails to parse the metadata and
all reads fail.
@@ -599,10 +602,13 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
new IcebergSnapshot(
snapshotId,
snapshotId,
+ snapshotId - 1,
System.currentTimeMillis(),
snapshotSummary,
pathFactory.toManifestListPath(manifestListFileName).toString(),
- schemaId));
+ schemaId,
+ null,
+ null));
// all snapshots in this list, except the last one, need to expire
List<IcebergSnapshot> toExpireExceptLast = new ArrayList<>();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
index c755e32c7d..0aeef94247 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
@@ -319,6 +319,7 @@ public class IcebergMetadata {
defaultSortOrderId,
snapshots,
currentSnapshotId,
+ properties,
refs);
}
@@ -347,6 +348,7 @@ public class IcebergMetadata {
&& defaultSortOrderId == that.defaultSortOrderId
&& Objects.equals(snapshots, that.snapshots)
&& currentSnapshotId == that.currentSnapshotId
+ && Objects.equals(properties, that.properties)
&& Objects.equals(refs, that.refs);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java
index df0224d22b..d6a0dd8f21 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java
@@ -21,8 +21,11 @@ package org.apache.paimon.iceberg.metadata;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
import java.util.Objects;
/**
@@ -35,10 +38,13 @@ public class IcebergSnapshot {
private static final String FIELD_SEQUENCE_NUMBER = "sequence-number";
private static final String FIELD_SNAPSHOT_ID = "snapshot-id";
+ private static final String FIELD_PARENT_SNAPSHOT_ID =
"parent-snapshot-id";
private static final String FIELD_TIMESTAMP_MS = "timestamp-ms";
private static final String FIELD_SUMMARY = "summary";
private static final String FIELD_MANIFEST_LIST = "manifest-list";
private static final String FIELD_SCHEMA_ID = "schema-id";
+ private static final String FIELD_FIRST_ROW_ID = "first-row-id";
+ private static final String FIELD_ADDED_ROWS = "added-rows";
@JsonProperty(FIELD_SEQUENCE_NUMBER)
private final long sequenceNumber;
@@ -46,6 +52,11 @@ public class IcebergSnapshot {
@JsonProperty(FIELD_SNAPSHOT_ID)
private final long snapshotId;
+ @JsonProperty(FIELD_PARENT_SNAPSHOT_ID)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ private final Long parentSnapshotId;
+
@JsonProperty(FIELD_TIMESTAMP_MS)
private final long timestampMs;
@@ -58,20 +69,36 @@ public class IcebergSnapshot {
@JsonProperty(FIELD_SCHEMA_ID)
private final int schemaId;
+ @JsonProperty(FIELD_FIRST_ROW_ID)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ private final Long firstRowId;
+
+ @JsonProperty(FIELD_ADDED_ROWS)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ private final Long addedRows;
+
@JsonCreator
public IcebergSnapshot(
@JsonProperty(FIELD_SEQUENCE_NUMBER) long sequenceNumber,
@JsonProperty(FIELD_SNAPSHOT_ID) long snapshotId,
+ @JsonProperty(FIELD_PARENT_SNAPSHOT_ID) Long parentSnapshotId,
@JsonProperty(FIELD_TIMESTAMP_MS) long timestampMs,
@JsonProperty(FIELD_SUMMARY) IcebergSnapshotSummary summary,
@JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
- @JsonProperty(FIELD_SCHEMA_ID) int schemaId) {
+ @JsonProperty(FIELD_SCHEMA_ID) int schemaId,
+ @JsonProperty(FIELD_FIRST_ROW_ID) Long firstRowId,
+ @JsonProperty(FIELD_ADDED_ROWS) Long addedRows) {
this.sequenceNumber = sequenceNumber;
this.snapshotId = snapshotId;
+ this.parentSnapshotId = parentSnapshotId;
this.timestampMs = timestampMs;
this.summary = summary;
this.manifestList = manifestList;
this.schemaId = schemaId;
+ this.firstRowId = firstRowId;
+ this.addedRows = addedRows;
}
@JsonGetter(FIELD_SEQUENCE_NUMBER)
@@ -104,10 +131,33 @@ public class IcebergSnapshot {
return schemaId;
}
+ @JsonGetter(FIELD_PARENT_SNAPSHOT_ID)
+ public Long parentSnapshotId() {
+ return parentSnapshotId;
+ }
+
+ @JsonGetter(FIELD_ADDED_ROWS)
+ public Long addedRows() {
+ return addedRows;
+ }
+
+ @JsonGetter(FIELD_FIRST_ROW_ID)
+ public Long firstRowId() {
+ return firstRowId;
+ }
+
@Override
public int hashCode() {
return Objects.hash(
- sequenceNumber, snapshotId, timestampMs, summary,
manifestList, schemaId);
+ sequenceNumber,
+ snapshotId,
+ parentSnapshotId,
+ timestampMs,
+ summary,
+ manifestList,
+ schemaId,
+ addedRows,
+ firstRowId);
}
@Override
@@ -122,9 +172,12 @@ public class IcebergSnapshot {
IcebergSnapshot that = (IcebergSnapshot) o;
return sequenceNumber == that.sequenceNumber
&& snapshotId == that.snapshotId
+ && Objects.equals(parentSnapshotId, that.parentSnapshotId)
&& timestampMs == that.timestampMs
&& Objects.equals(summary, that.summary)
&& Objects.equals(manifestList, that.manifestList)
- && schemaId == that.schemaId;
+ && schemaId == that.schemaId
+ && Objects.equals(addedRows, that.addedRows)
+ && Objects.equals(firstRowId, that.firstRowId);
}
}
diff --git
a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
index 1fa0db2441..119ddcd743 100644
---
a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
+++
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
@@ -374,10 +374,13 @@ public class IcebergRestMetadataCommitter implements
IcebergMetadataCommitter {
new IcebergSnapshot(
snapshot.sequenceNumber(),
snapshot.snapshotId(),
+ snapshot.parentSnapshotId(),
snapshot.timestampMs(),
snapshot.summary(),
snapshot.manifestList(),
- snapshot.schemaId() + 1))
+ snapshot.schemaId() + 1,
+ snapshot.firstRowId(),
+ snapshot.addedRows()))
.collect(Collectors.toList());
return new IcebergMetadata(
newIcebergMetadata.formatVersion(),
diff --git
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
new file mode 100644
index 0000000000..3fc1aad2cd
--- /dev/null
+++
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
+import org.apache.paimon.iceberg.metadata.IcebergSchema;
+import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
+import org.apache.paimon.options.Options;
+
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.rest.RESTCatalogServer;
+import org.apache.iceberg.rest.RESTServerExtension;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class IcebergMetadataTest {
+ @RegisterExtension
+ private static final RESTServerExtension REST_SERVER_EXTENSION =
+ new RESTServerExtension(
+ Map.of(
+ RESTCatalogServer.REST_PORT,
+ RESTServerExtension.FREE_PORT,
+ CatalogProperties.CLIENT_POOL_SIZE,
+ "1",
+ CatalogProperties.CATALOG_IMPL,
+ HadoopCatalog.class.getName()));
+
+ protected static RESTCatalog restCatalog;
+
+ @BeforeEach
+ void setUp() {
+ restCatalog = REST_SERVER_EXTENSION.client();
+ }
+
+ @Test
+ @DisplayName("Test reading metadata from basic Iceberg table creation")
+ void testReadBasicIcebergTableMetadata() throws Exception {
+ // Create a basic Iceberg table
+ Table icebergTable = createBasicIcebergTable("basic_table");
+
+ // Read metadata using Paimon's IcebergMetadata
+ IcebergMetadata paimonIcebergMetadata =
readIcebergMetadata(icebergTable);
+
+ // Verify basic properties
+ assertThat(paimonIcebergMetadata.formatVersion()).isEqualTo(2);
+ assertThat(paimonIcebergMetadata.tableUuid()).isNotNull();
+ assertThat(paimonIcebergMetadata.location()).isNotNull();
+ assertThat(paimonIcebergMetadata.currentSchemaId()).isEqualTo(0);
+ assertThat(paimonIcebergMetadata.defaultSpecId()).isEqualTo(0);
+ assertThat(paimonIcebergMetadata.defaultSortOrderId()).isEqualTo(0);
+
+ // Verify schema
+ assertThat(paimonIcebergMetadata.schemas()).hasSize(1);
+ IcebergSchema schema = paimonIcebergMetadata.schemas().get(0);
+ assertThat(schema.fields()).hasSize(3);
+
+ // Verify field details
+ assertThat(schema.fields().get(0).name()).isEqualTo("id");
+ assertThat(schema.fields().get(0).type()).isEqualTo("int");
+ assertThat(schema.fields().get(0).required()).isTrue();
+
+ assertThat(schema.fields().get(1).name()).isEqualTo("name");
+ assertThat(schema.fields().get(1).type()).isEqualTo("string");
+ assertThat(schema.fields().get(1).required()).isTrue();
+
+ assertThat(schema.fields().get(2).name()).isEqualTo("age");
+ assertThat(schema.fields().get(2).type()).isEqualTo("int");
+ assertThat(schema.fields().get(2).required()).isFalse();
+
+ // Verify partition specs (should be unpartitioned)
+ assertThat(paimonIcebergMetadata.partitionSpecs()).hasSize(1);
+
assertThat(paimonIcebergMetadata.partitionSpecs().get(0).fields()).isEmpty();
+
+ // Verify snapshots (should be empty initially)
+ assertThat(paimonIcebergMetadata.snapshots()).isEmpty();
+ assertThat(paimonIcebergMetadata.currentSnapshotId()).isEqualTo(-1);
+ }
+
+ @Test
+ @DisplayName("Test reading metadata from partitioned Iceberg table")
+ void testReadPartitionedIcebergTableMetadata() throws Exception {
+ // Create a partitioned Iceberg table
+ Table icebergTable =
createPartitionedIcebergTable("partitioned_table");
+
+ // Read metadata using Paimon's IcebergMetadata
+ IcebergMetadata paimonIcebergMetadata =
readIcebergMetadata(icebergTable);
+
+ // Verify basic properties
+ assertThat(paimonIcebergMetadata.formatVersion()).isEqualTo(2);
+ assertThat(paimonIcebergMetadata.tableUuid()).isNotNull();
+ assertThat(paimonIcebergMetadata.location()).isNotNull();
+
+ // Verify schema
+ assertThat(paimonIcebergMetadata.schemas()).hasSize(1);
+ IcebergSchema schema = paimonIcebergMetadata.schemas().get(0);
+ assertThat(schema.fields()).hasSize(4);
+
+ // Verify field details
+ assertThat(schema.fields().get(0).name()).isEqualTo("id");
+ assertThat(schema.fields().get(1).name()).isEqualTo("name");
+ assertThat(schema.fields().get(2).name()).isEqualTo("department");
+ assertThat(schema.fields().get(3).name()).isEqualTo("salary");
+
+ // Verify partition specs
+ assertThat(paimonIcebergMetadata.partitionSpecs()).hasSize(1);
+ IcebergPartitionSpec partitionSpec =
paimonIcebergMetadata.partitionSpecs().get(0);
+ assertThat(partitionSpec.fields()).hasSize(1);
+
assertThat(partitionSpec.fields().get(0).name()).isEqualTo("department");
+
assertThat(partitionSpec.fields().get(0).transform()).isEqualTo("identity");
+
+ // Verify snapshots (should be empty initially)
+ assertThat(paimonIcebergMetadata.snapshots()).isEmpty();
+ assertThat(paimonIcebergMetadata.currentSnapshotId()).isEqualTo(-1);
+ }
+
+ @Test
+ @DisplayName("Test reading metadata from sorted Iceberg table")
+ void testReadSortedIcebergTableMetadata() throws Exception {
+ // Create a sorted Iceberg table
+ Table icebergTable = createSortedIcebergTable("sorted_table");
+
+ // Read metadata using Paimon's IcebergMetadata
+ IcebergMetadata paimonIcebergMetadata =
readIcebergMetadata(icebergTable);
+
+ // Verify basic properties
+ assertThat(paimonIcebergMetadata.formatVersion()).isEqualTo(2);
+ assertThat(paimonIcebergMetadata.tableUuid()).isNotNull();
+ assertThat(paimonIcebergMetadata.location()).isNotNull();
+
+ // Verify schema
+ assertThat(paimonIcebergMetadata.schemas()).hasSize(1);
+ IcebergSchema schema = paimonIcebergMetadata.schemas().get(0);
+ assertThat(schema.fields()).hasSize(3);
+
+ // Verify field details
+ assertThat(schema.fields().get(0).name()).isEqualTo("id");
+ assertThat(schema.fields().get(1).name()).isEqualTo("name");
+ assertThat(schema.fields().get(2).name()).isEqualTo("score");
+
+ // Verify sort orders
+ assertThat(paimonIcebergMetadata.sortOrders()).hasSize(1);
+ assertThat(paimonIcebergMetadata.defaultSortOrderId()).isEqualTo(1);
+
+ // Verify snapshots (should be empty initially)
+ assertThat(paimonIcebergMetadata.snapshots()).isEmpty();
+ assertThat(paimonIcebergMetadata.currentSnapshotId()).isEqualTo(-1);
+ }
+
+ @Test
+ @DisplayName("Test reading metadata after Iceberg table operations")
+ void testReadMetadataAfterIcebergOperations() throws Exception {
+ // Create a basic Iceberg table
+ Table icebergTable = createBasicIcebergTable("operations_table");
+
+ // Perform first append operation
+ icebergTable
+ .newFastAppend()
+ .appendFile(
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(100)
+ .withRecordCount(10)
+ .build())
+ .commit();
+
+ // Read metadata after first operation
+ IcebergMetadata paimonIcebergMetadata1 =
readIcebergMetadata("operations_table");
+
+ // Verify snapshots after first operation
+ assertThat(paimonIcebergMetadata1.snapshots()).hasSize(1);
+ assertThat(paimonIcebergMetadata1.currentSnapshotId()).isNotNull();
+
assertThat(paimonIcebergMetadata1.snapshots().get(0).parentSnapshotId()).isNull();
+
+ // Perform second append operation
+ icebergTable
+ .newFastAppend()
+ .appendFile(
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-b.parquet")
+ .withFileSizeInBytes(200)
+ .withRecordCount(20)
+ .build())
+ .commit();
+
+ // Read metadata after second operation
+ IcebergMetadata paimonIcebergMetadata2 =
readIcebergMetadata("operations_table");
+
+ // Verify snapshots after second operation
+ assertThat(paimonIcebergMetadata2.snapshots()).hasSize(2);
+ assertThat(paimonIcebergMetadata2.currentSnapshotId())
+
.isEqualTo(paimonIcebergMetadata2.snapshots().get(1).snapshotId());
+
assertThat(paimonIcebergMetadata2.snapshots().get(1).parentSnapshotId())
+
.isEqualTo(paimonIcebergMetadata2.snapshots().get(0).snapshotId());
+
+ // Verify snapshot sequence numbers
+
assertThat(paimonIcebergMetadata2.snapshots().get(0).sequenceNumber()).isEqualTo(1L);
+
assertThat(paimonIcebergMetadata2.snapshots().get(1).sequenceNumber()).isEqualTo(2L);
+ }
+
+ @Test
+ @DisplayName("Test reading metadata with Iceberg table properties")
+ void testReadMetadataWithIcebergTableProperties() throws Exception {
+ // Create Iceberg table with custom properties
+ TableIdentifier identifier = TableIdentifier.of("testdb",
"properties_table");
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "id",
Types.IntegerType.get()),
+ Types.NestedField.required(2, "name",
Types.StringType.get()));
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("write.format.default", "parquet");
+ properties.put("write.parquet.compression-codec", "snappy");
+ properties.put("write.target-file-size-bytes", "134217728");
+
+ Table icebergTable =
+ restCatalog.buildTable(identifier,
schema).withProperties(properties).create();
+
+ // Read metadata using Paimon's IcebergMetadata
+ IcebergMetadata paimonIcebergMetadata =
readIcebergMetadata(icebergTable);
+
+ // Verify properties
+ assertThat(paimonIcebergMetadata.properties()).isNotEmpty();
+
assertThat(paimonIcebergMetadata.properties().get("write.format.default"))
+ .isEqualTo("parquet");
+
assertThat(paimonIcebergMetadata.properties().get("write.parquet.compression-codec"))
+ .isEqualTo("snappy");
+
assertThat(paimonIcebergMetadata.properties().get("write.target-file-size-bytes"))
+ .isEqualTo("134217728");
+ }
+
+ @Test
+ @DisplayName("Test reading metadata with complex Iceberg schema")
+ void testReadMetadataWithComplexIcebergSchema() throws Exception {
+ // Create Iceberg table with complex schema
+ TableIdentifier identifier = TableIdentifier.of("testdb",
"complex_schema_table");
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "id",
Types.IntegerType.get()),
+ Types.NestedField.required(2, "name",
Types.StringType.get()),
+ Types.NestedField.optional(
+ 3,
+ "address",
+ Types.StructType.of(
+ Types.NestedField.required(
+ 4, "street",
Types.StringType.get()),
+ Types.NestedField.required(
+ 5, "city",
Types.StringType.get()),
+ Types.NestedField.optional(
+ 6, "zipcode",
Types.StringType.get()))),
+ Types.NestedField.optional(
+ 7, "tags", Types.ListType.ofOptional(8,
Types.StringType.get())),
+ Types.NestedField.optional(
+ 9,
+ "metadata",
+ Types.MapType.ofOptional(
+ 10, 11, Types.StringType.get(),
Types.StringType.get())));
+
+ Table icebergTable = restCatalog.buildTable(identifier,
schema).create();
+
+ // Read metadata using Paimon's IcebergMetadata
+ IcebergMetadata paimonIcebergMetadata =
readIcebergMetadata(icebergTable);
+
+ // Verify schema
+ assertThat(paimonIcebergMetadata.schemas()).hasSize(1);
+ IcebergSchema paimonIcebergSchema =
paimonIcebergMetadata.schemas().get(0);
+ assertThat(paimonIcebergSchema.fields()).hasSize(5);
+
+ // Verify basic fields
+ assertThat(paimonIcebergSchema.fields().get(0).name()).isEqualTo("id");
+
assertThat(paimonIcebergSchema.fields().get(0).type()).isEqualTo("int");
+
assertThat(paimonIcebergSchema.fields().get(1).name()).isEqualTo("name");
+
assertThat(paimonIcebergSchema.fields().get(1).type()).isEqualTo("string");
+
+ // Verify complex fields exist
+
assertThat(paimonIcebergSchema.fields().get(2).name()).isEqualTo("address");
+
assertThat(paimonIcebergSchema.fields().get(3).name()).isEqualTo("tags");
+
assertThat(paimonIcebergSchema.fields().get(4).name()).isEqualTo("metadata");
+ }
+
+ @Test
+ @DisplayName("Test reading metadata with Iceberg table evolution")
+ void testReadMetadataWithIcebergTableEvolution() throws Exception {
+ // Create initial Iceberg table
+ Table icebergTable = createBasicIcebergTable("evolution_table");
+
+ // Read initial metadata
+ IcebergMetadata initialMetadata =
readIcebergMetadata("evolution_table");
+ assertThat(initialMetadata.schemas()).hasSize(1);
+ assertThat(initialMetadata.schemas().get(0).fields()).hasSize(3);
+ assertThat(initialMetadata.currentSchemaId()).isEqualTo(0);
+
+ // Evolve schema by adding a new column
+ icebergTable.updateSchema().addColumn("email",
Types.StringType.get()).commit();
+
+ // Read metadata after schema evolution
+ IcebergMetadata evolvedMetadata =
readIcebergMetadata("evolution_table");
+
+ // Verify schema evolution
+ assertThat(evolvedMetadata.schemas()).hasSize(2); // Should have 2
schemas now
+ assertThat(evolvedMetadata.currentSchemaId())
+ .isEqualTo(1); // Current schema should be the new one
+
+ // Verify the current schema has the new field
+ IcebergSchema currentSchema =
+ evolvedMetadata.schemas().stream()
+ .filter(schema -> schema.schemaId() ==
evolvedMetadata.currentSchemaId())
+ .findFirst()
+ .orElseThrow();
+ assertThat(currentSchema.fields()).hasSize(4);
+ assertThat(currentSchema.fields().get(3).name()).isEqualTo("email");
+ assertThat(currentSchema.fields().get(3).type()).isEqualTo("string");
+ }
+
+ @Test
+ @DisplayName("Test reading metadata with Iceberg table partitioning
evolution")
+ void testReadMetadataWithIcebergPartitioningEvolution() throws Exception {
+ // Create initial unpartitioned Iceberg table
+ Table icebergTable =
createBasicIcebergTable("partition_evolution_table");
+
+ // Read initial metadata
+ IcebergMetadata initialMetadata =
readIcebergMetadata("partition_evolution_table");
+ assertThat(initialMetadata.partitionSpecs()).hasSize(1);
+
assertThat(initialMetadata.partitionSpecs().get(0).fields()).isEmpty(); //
Unpartitioned
+
+ // Evolve partitioning by adding a partition
+ icebergTable.updateSpec().addField("name").commit();
+
+ // Read metadata after partitioning evolution
+ IcebergMetadata evolvedMetadata =
readIcebergMetadata("partition_evolution_table");
+
+ // Verify partitioning evolution
+ assertThat(evolvedMetadata.partitionSpecs())
+ .hasSize(2); // Should have 2 partition specs now
+ assertThat(evolvedMetadata.defaultSpecId())
+ .isEqualTo(1); // Default spec should be the new one
+
+ // Verify the current partition spec has the new field
+ IcebergPartitionSpec currentSpec =
+ evolvedMetadata.partitionSpecs().stream()
+ .filter(spec -> spec.specId() ==
evolvedMetadata.defaultSpecId())
+ .findFirst()
+ .orElseThrow();
+ assertThat(currentSpec.fields()).hasSize(1);
+ assertThat(currentSpec.fields().get(0).name()).isEqualTo("name");
+
assertThat(currentSpec.fields().get(0).transform()).isEqualTo("identity");
+ }
+
+ @Test
+ @DisplayName("Test FORMAT_VERSION_V3 table")
+ void testFormatVersionV3Table() throws Exception {
+ // Create a v3 format version Iceberg table
+ Table icebergTable = createIcebergTableV3("v3_snapshot_table");
+ TableMetadata base = ((HasTableOperations)
icebergTable).operations().current();
+ ((HasTableOperations) icebergTable)
+ .operations()
+ .commit(base,
TableMetadata.buildFrom(base).enableRowLineage().build());
+
+ // Read metadata using Paimon's IcebergMetadata
+ IcebergMetadata paimonIcebergMetadata =
readIcebergMetadata(icebergTable);
+
+ // Verify basic properties
+ assertThat(paimonIcebergMetadata.formatVersion()).isEqualTo(3);
+ assertThat(paimonIcebergMetadata.tableUuid()).isNotNull();
+ assertThat(paimonIcebergMetadata.location()).isNotNull();
+ assertThat(paimonIcebergMetadata.currentSchemaId()).isEqualTo(0);
+ assertThat(paimonIcebergMetadata.defaultSpecId()).isEqualTo(0);
+ assertThat(paimonIcebergMetadata.defaultSortOrderId()).isEqualTo(0);
+
+ // Verify schema
+ assertThat(paimonIcebergMetadata.schemas()).hasSize(1);
+ IcebergSchema schema = paimonIcebergMetadata.schemas().get(0);
+ assertThat(schema.fields()).hasSize(3);
+
+ // Verify field details
+ assertThat(schema.fields().get(0).name()).isEqualTo("id");
+ assertThat(schema.fields().get(0).type()).isEqualTo("int");
+ assertThat(schema.fields().get(0).required()).isTrue();
+
+ assertThat(schema.fields().get(1).name()).isEqualTo("name");
+ assertThat(schema.fields().get(1).type()).isEqualTo("string");
+ assertThat(schema.fields().get(1).required()).isTrue();
+
+ assertThat(schema.fields().get(2).name()).isEqualTo("age");
+ assertThat(schema.fields().get(2).type()).isEqualTo("int");
+ assertThat(schema.fields().get(2).required()).isFalse();
+
+ // Verify partition specs (should be unpartitioned)
+ assertThat(paimonIcebergMetadata.partitionSpecs()).hasSize(1);
+
assertThat(paimonIcebergMetadata.partitionSpecs().get(0).fields()).isEmpty();
+
+ // Perform first append operation
+ icebergTable
+ .newFastAppend()
+ .appendFile(
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-v3.parquet")
+ .withFileSizeInBytes(100)
+ .withRecordCount(10)
+ .build())
+ .commit();
+
+ // Read metadata after first operation
+ paimonIcebergMetadata = readIcebergMetadata("v3_snapshot_table");
+
+ // Verify snapshots after first operation
+ assertThat(paimonIcebergMetadata.snapshots()).hasSize(1);
+ assertThat(paimonIcebergMetadata.currentSnapshotId()).isNotNull();
+
+ IcebergSnapshot snapshot = paimonIcebergMetadata.snapshots().get(0);
+ assertThat(snapshot.parentSnapshotId()).isNull();
+
+ assertThat(snapshot.firstRowId()).isEqualTo(0L);
+ assertThat(snapshot.addedRows()).isEqualTo(10L);
+
+ // Verify other snapshot properties
+ assertThat(snapshot.snapshotId()).isNotNull();
+ assertThat(snapshot.sequenceNumber()).isEqualTo(1L);
+ assertThat(snapshot.timestampMs()).isGreaterThan(0);
+ assertThat(snapshot.schemaId()).isEqualTo(0);
+
+ // Perform second append operation
+ icebergTable
+ .newFastAppend()
+ .appendFile(
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-v3-2.parquet")
+ .withFileSizeInBytes(200)
+ .withRecordCount(20)
+ .build())
+ .commit();
+
+ // Read metadata after second operation
+ IcebergMetadata paimonIcebergMetadata2 =
readIcebergMetadata("v3_snapshot_table");
+
+ // Verify second snapshot
+ assertThat(paimonIcebergMetadata2.snapshots()).hasSize(2);
+ assertThat(paimonIcebergMetadata2.currentSnapshotId())
+
.isEqualTo(paimonIcebergMetadata2.snapshots().get(1).snapshotId());
+
assertThat(paimonIcebergMetadata2.snapshots().get(1).parentSnapshotId())
+
.isEqualTo(paimonIcebergMetadata2.snapshots().get(0).snapshotId());
+
+ // Verify snapshot sequence numbers
+
assertThat(paimonIcebergMetadata2.snapshots().get(0).sequenceNumber()).isEqualTo(1L);
+
assertThat(paimonIcebergMetadata2.snapshots().get(1).sequenceNumber()).isEqualTo(2L);
+ }
+
+ /** Helper method to create a basic Iceberg table with simple schema. */
+ private Table createBasicIcebergTable(String tableName) {
+ TableIdentifier identifier = TableIdentifier.of("testdb", tableName);
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "id",
Types.IntegerType.get()),
+ Types.NestedField.required(2, "name",
Types.StringType.get()),
+ Types.NestedField.optional(3, "age",
Types.IntegerType.get()));
+ return restCatalog.buildTable(identifier, schema).create();
+ }
+
+ /** Helper method to create an Iceberg table with partitioning. */
+ private Table createPartitionedIcebergTable(String tableName) {
+ TableIdentifier identifier = TableIdentifier.of("testdb", tableName);
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "id",
Types.IntegerType.get()),
+ Types.NestedField.required(2, "name",
Types.StringType.get()),
+ Types.NestedField.required(3, "department",
Types.StringType.get()),
+ Types.NestedField.required(4, "salary",
Types.DoubleType.get()));
+ PartitionSpec partitionSpec =
+
PartitionSpec.builderFor(schema).identity("department").build();
+ return restCatalog.buildTable(identifier,
schema).withPartitionSpec(partitionSpec).create();
+ }
+
+ /** Helper method to create an Iceberg table with sort order. */
+ private Table createSortedIcebergTable(String tableName) {
+ TableIdentifier identifier = TableIdentifier.of("testdb", tableName);
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "id",
Types.IntegerType.get()),
+ Types.NestedField.required(2, "name",
Types.StringType.get()),
+ Types.NestedField.required(3, "score",
Types.DoubleType.get()));
+ SortOrder sortOrder =
SortOrder.builderFor(schema).asc("score").desc("id").build();
+ return restCatalog.buildTable(identifier,
schema).withSortOrder(sortOrder).create();
+ }
+
+ /** Helper method to create an Iceberg table with FORMAT_VERSION_V3. */
+ private Table createIcebergTableV3(String tableName) {
+ TableIdentifier identifier = TableIdentifier.of("testdb", tableName);
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "id",
Types.IntegerType.get()),
+ Types.NestedField.required(2, "name",
Types.StringType.get()),
+ Types.NestedField.optional(3, "age",
Types.IntegerType.get()));
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("write.format.default", "parquet");
+ properties.put("write.parquet.compression-codec", "snappy");
+ properties.put("format-version", "3");
+
+ return restCatalog.buildTable(identifier,
schema).withProperties(properties).create();
+ }
+
+ /** Helper method to read Iceberg metadata using Paimon's IcebergMetadata.
*/
+ private IcebergMetadata readIcebergMetadata(String tableName) throws
Exception {
+ TableIdentifier identifier = TableIdentifier.of("testdb", tableName);
+ Table icebergTable = restCatalog.loadTable(identifier);
+ return readIcebergMetadata(icebergTable);
+ }
+
+ private IcebergMetadata readIcebergMetadata(Table icebergTable) throws
Exception {
+ String metaFileLocation = TableUtil.metadataFileLocation(icebergTable);
+ Path metaFilePath = new Path(metaFileLocation);
+ Options options = new Options();
+ FileIO fileIO = FileIO.get(metaFilePath,
CatalogContext.create(options));
+ return IcebergMetadata.fromPath(fileIO, metaFilePath);
+ }
+}
diff --git
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
index 6eebe9f1e6..e52ac6e06b 100644
---
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
+++
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
@@ -29,6 +29,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
@@ -608,6 +609,70 @@ public class IcebergRestMetadataCommitterTest {
commit.close();
}
+ @Test
+ public void testParentSnapshotIdTracking() throws Exception {
+ // create and write with paimon client
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"k", "v"});
+ FileStoreTable table =
+ createPaimonTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ 1,
+ randomFormat(),
+ Collections.emptyMap());
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+
+ // First commit - should have null parent snapshot ID
+ write.write(GenericRow.of(1, 10));
+ write.write(GenericRow.of(2, 20));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ FileIO fileIO = table.fileIO();
+ IcebergMetadata metadata1 =
+ IcebergMetadata.fromPath(
+ fileIO, new Path(catalogTableMetadataPath(table),
"v1.metadata.json"));
+ assertThat(metadata1.snapshots()).hasSize(1);
+ assertThat(metadata1.snapshots().get(0).parentSnapshotId()).isNull();
+ assertThat(metadata1.snapshots().get(0).snapshotId()).isEqualTo(1);
+
+ // Second commit - should have parent snapshot ID pointing to first
snapshot
+ write.write(GenericRow.of(1, 11));
+ write.write(GenericRow.of(3, 30));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ IcebergMetadata metadata2 =
+ IcebergMetadata.fromPath(
+ fileIO, new Path(catalogTableMetadataPath(table),
"v2.metadata.json"));
+ assertThat(metadata2.snapshots()).hasSize(2);
+ // The last snapshot should have parent pointing to the previous
snapshot
+ IcebergSnapshot lastSnapshot =
metadata2.snapshots().get(metadata2.snapshots().size() - 1);
+ assertThat(lastSnapshot.parentSnapshotId()).isEqualTo(1L);
+ assertThat(lastSnapshot.snapshotId()).isEqualTo(2);
+
+ // Third commit - should have parent snapshot ID pointing to second
snapshot
+ write.write(GenericRow.of(2, 21));
+ write.write(GenericRow.of(4, 40));
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ IcebergMetadata metadata3 =
+ IcebergMetadata.fromPath(
+ fileIO, new Path(catalogTableMetadataPath(table),
"v3.metadata.json"));
+ assertThat(metadata3.snapshots()).hasSize(3);
+ // The last snapshot should have parent pointing to the previous
snapshot
+ IcebergSnapshot lastSnapshot3 =
metadata3.snapshots().get(metadata3.snapshots().size() - 1);
+ assertThat(lastSnapshot3.parentSnapshotId()).isEqualTo(2L);
+ assertThat(lastSnapshot3.snapshotId()).isEqualTo(3);
+
+ write.close();
+ commit.close();
+ }
+
private static class TestRecord {
private final BinaryRow partition;
private final GenericRow record;