This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f83ecf4dd [core] Introduce IcebergCommitCallback to create Iceberg 
metadata after commit (#3731)
f83ecf4dd is described below

commit f83ecf4dd7c98cec37fe5e30bad69b77a5b38d7c
Author: tsreaper <[email protected]>
AuthorDate: Fri Jul 12 10:35:13 2024 +0800

    [core] Introduce IcebergCommitCallback to create Iceberg metadata after 
commit (#3731)
---
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  12 +
 paimon-core/pom.xml                                |  14 +
 .../paimon/iceberg/IcebergCommitCallback.java      | 248 +++++++++++++++
 .../apache/paimon/iceberg/IcebergPathFactory.java  |  93 ++++++
 .../iceberg/manifest/IcebergConversions.java       |  81 +++++
 .../iceberg/manifest/IcebergDataFileMeta.java      | 144 +++++++++
 .../manifest/IcebergDataFileMetaSerializer.java    |  61 ++++
 .../iceberg/manifest/IcebergManifestEntry.java     | 136 ++++++++
 .../manifest/IcebergManifestEntrySerializer.java   |  57 ++++
 .../iceberg/manifest/IcebergManifestFile.java      | 189 +++++++++++
 .../iceberg/manifest/IcebergManifestFileMeta.java  | 228 ++++++++++++++
 .../IcebergManifestFileMetaSerializer.java         |  94 ++++++
 .../iceberg/manifest/IcebergManifestList.java      |  48 +++
 .../iceberg/manifest/IcebergPartitionSummary.java  |  97 ++++++
 .../IcebergPartitionSummarySerializer.java         |  46 +++
 .../paimon/iceberg/metadata/IcebergDataField.java  | 159 ++++++++++
 .../paimon/iceberg/metadata/IcebergMetadata.java   | 317 +++++++++++++++++++
 .../iceberg/metadata/IcebergPartitionField.java    | 119 +++++++
 .../iceberg/metadata/IcebergPartitionSpec.java     |  88 ++++++
 .../paimon/iceberg/metadata/IcebergSchema.java     | 109 +++++++
 .../paimon/iceberg/metadata/IcebergSnapshot.java   | 130 ++++++++
 .../iceberg/metadata/IcebergSnapshotSummary.java   |  71 +++++
 .../paimon/iceberg/metadata/IcebergSortOrder.java  |  90 ++++++
 .../paimon/table/AbstractFileStoreTable.java       |  12 +-
 .../paimon/iceberg/IcebergCompatibilityTest.java   | 348 +++++++++++++++++++++
 .../apache/paimon/format/avro/AvroFileFormat.java  |  12 +-
 .../paimon/format/avro/AvroSchemaConverter.java    |  28 +-
 28 files changed, 3027 insertions(+), 10 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index ed9fdb53c..81f80c0fd 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -417,6 +417,12 @@ Mainly to resolve data skew on primary keys. We recommend 
starting with 64 mb wh
             <td><p>Enum</p></td>
             <td>Specify the merge engine for table with primary key.<br /><br 
/>Possible values:<ul><li>"deduplicate": De-duplicate and keep the last 
row.</li><li>"partial-update": Partial update non-null 
fields.</li><li>"aggregation": Aggregate fields with same primary 
key.</li><li>"first-row": De-duplicate and keep the first row.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>metadata.iceberg-compatible</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>When set to true, produce Iceberg metadata after a snapshot is 
committed, so that Iceberg readers can read Paimon's raw files.</td>
+        </tr>
         <tr>
             <td><h5>metadata.stats-mode</h5></td>
             <td style="word-wrap: break-word;">"truncate(16)"</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 3c286ecd2..69c236b9d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1252,6 +1252,14 @@ public class CoreOptions implements Serializable {
                                     + ChangelogProducer.LOOKUP.name()
                                     + ", commit will wait for changelog 
generation by lookup.");
 
+    public static final ConfigOption<Boolean> METADATA_ICEBERG_COMPATIBLE =
+            key("metadata.iceberg-compatible")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "When set to true, produce Iceberg metadata after 
a snapshot is committed, "
+                                    + "so that Iceberg readers can read 
Paimon's raw files.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -1977,6 +1985,10 @@ public class CoreOptions implements Serializable {
                 && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
     }
 
+    public boolean metadataIcebergCompatible() {
+        return options.get(METADATA_ICEBERG_COMPATIBLE);
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 533a1cd3d..a2591bc6b 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -190,6 +190,20 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>1.5.2</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-data</artifactId>
+            <version>1.5.2</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
new file mode 100644
index 000000000..95dae5569
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.manifest.IcebergDataFileMeta;
+import org.apache.paimon.iceberg.manifest.IcebergManifestEntry;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFile;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta;
+import org.apache.paimon.iceberg.manifest.IcebergManifestList;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionField;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
+import org.apache.paimon.iceberg.metadata.IcebergSchema;
+import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
+import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitCallback;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.RawFile;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * A {@link CommitCallback} to create Iceberg compatible metadata, so Iceberg 
readers can read
+ * Paimon's {@link RawFile}.
+ */
+public class IcebergCommitCallback implements CommitCallback {
+
+    // see org.apache.iceberg.hadoop.Util
+    private static final String VERSION_HINT_FILENAME = "version-hint.text";
+
+    private final FileStoreTable table;
+    private final String commitUser;
+    private final IcebergPathFactory pathFactory;
+
+    private final IcebergManifestFile manifestFile;
+    private final IcebergManifestList manifestList;
+
+    public IcebergCommitCallback(FileStoreTable table, String commitUser) {
+        this.table = table;
+        this.commitUser = commitUser;
+        this.pathFactory = new IcebergPathFactory(table.location());
+
+        RowType partitionType = table.schema().logicalPartitionType();
+        RowType entryType = IcebergManifestEntry.schema(partitionType);
+        Options manifestFileAvroOptions = Options.fromMap(table.options());
+        // 
https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestReader.java
+        manifestFileAvroOptions.set(
+                "avro.row-name-mapping",
+                "org.apache.paimon.avro.generated.record:manifest_entry,"
+                        + "manifest_entry_data_file:r2,"
+                        + "r2_partition:r102");
+        FileFormat manifestFileAvro = 
FileFormat.getFileFormat(manifestFileAvroOptions, "avro");
+        this.manifestFile =
+                new IcebergManifestFile(
+                        table.fileIO(),
+                        partitionType,
+                        manifestFileAvro.createReaderFactory(entryType),
+                        manifestFileAvro.createWriterFactory(entryType),
+                        table.coreOptions().manifestCompression(),
+                        pathFactory.manifestFileFactory(),
+                        table.coreOptions().manifestTargetSize());
+
+        Options manifestListAvroOptions = Options.fromMap(table.options());
+        // 
https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestLists.java
+        manifestListAvroOptions.set(
+                "avro.row-name-mapping",
+                "org.apache.paimon.avro.generated.record:manifest_file,"
+                        + "manifest_file_partitions:r508");
+        FileFormat manifestListAvro = 
FileFormat.getFileFormat(manifestListAvroOptions, "avro");
+        this.manifestList =
+                new IcebergManifestList(
+                        table.fileIO(),
+                        
manifestListAvro.createReaderFactory(IcebergManifestFileMeta.schema()),
+                        
manifestListAvro.createWriterFactory(IcebergManifestFileMeta.schema()),
+                        table.coreOptions().manifestCompression(),
+                        pathFactory.manifestListFactory());
+    }
+
+    @Override
+    public void call(List<ManifestCommittable> committables) {
+        for (ManifestCommittable committable : committables) {
+            try {
+                commitMetadata(committable);
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        }
+    }
+
+    private void commitMetadata(ManifestCommittable committable) throws 
IOException {
+        Pair<Long, Long> pair = 
getCurrentAndBaseSnapshotIds(committable.identifier());
+        long currentSnapshot = pair.getLeft();
+        Long baseSnapshot = pair.getRight();
+
+        createMetadataWithoutBase(currentSnapshot);
+    }
+
+    private Pair<Long, Long> getCurrentAndBaseSnapshotIds(long 
commitIdentifier) {
+        SnapshotManager snapshotManager = table.snapshotManager();
+        List<Snapshot> currentSnapshots =
+                snapshotManager.findSnapshotsForIdentifiers(
+                        commitUser, 
Collections.singletonList(commitIdentifier));
+        Preconditions.checkArgument(
+                currentSnapshots.size() == 1,
+                "Cannot find snapshot with user {} and identifier {}",
+                commitUser,
+                commitIdentifier);
+        long currentSnapshotId = currentSnapshots.get(0).id();
+
+        long earliest =
+                Preconditions.checkNotNull(
+                        snapshotManager.earliestSnapshotId(),
+                        "Cannot determine earliest snapshot ID. This is 
unexpected.");
+        Long baseSnapshotId = null;
+        for (long id = currentSnapshotId - 1; id >= earliest; id--) {
+            try {
+                Snapshot snapshot = snapshotManager.snapshot(id);
+                if (!snapshot.commitUser().equals(commitUser)
+                        || snapshot.commitIdentifier() < commitIdentifier) {
+                    baseSnapshotId = id;
+                    break;
+                }
+            } catch (Exception ignore) {
+                break;
+            }
+        }
+
+        return Pair.of(currentSnapshotId, baseSnapshotId);
+    }
+
+    private void createMetadataWithoutBase(long snapshotId) throws IOException 
{
+        SnapshotReader snapshotReader = 
table.newSnapshotReader().withSnapshot(snapshotId);
+        Iterator<IcebergManifestEntry> entryIterator =
+                snapshotReader.read().dataSplits().stream()
+                        .filter(DataSplit::rawConvertible)
+                        .flatMap(s -> dataSplitToDataFileMeta(s).stream())
+                        .map(
+                                m ->
+                                        new IcebergManifestEntry(
+                                                
IcebergManifestEntry.Status.ADDED,
+                                                snapshotId,
+                                                snapshotId,
+                                                snapshotId,
+                                                m))
+                        .iterator();
+        List<IcebergManifestFileMeta> manifestFileMetas =
+                manifestFile.rollingWrite(entryIterator, snapshotId);
+        String manifestListFileName = 
manifestList.writeWithoutRolling(manifestFileMetas);
+
+        List<IcebergPartitionField> partitionFields =
+                getPartitionFields(table.schema().logicalPartitionType());
+        int schemaId = (int) table.schema().id();
+        IcebergSnapshot snapshot =
+                new IcebergSnapshot(
+                        snapshotId,
+                        snapshotId,
+                        System.currentTimeMillis(),
+                        new 
IcebergSnapshotSummary(IcebergSnapshotSummary.OPERATION_APPEND),
+                        
pathFactory.toManifestListPath(manifestListFileName).toString(),
+                        schemaId);
+
+        String tableUuid = UUID.randomUUID().toString();
+        IcebergMetadata metadata =
+                new IcebergMetadata(
+                        tableUuid,
+                        table.location().toString(),
+                        snapshotId,
+                        table.schema().highestFieldId(),
+                        Collections.singletonList(new 
IcebergSchema(table.schema())),
+                        schemaId,
+                        Collections.singletonList(new 
IcebergPartitionSpec(partitionFields)),
+                        partitionFields.stream()
+                                .mapToInt(IcebergPartitionField::fieldId)
+                                .max()
+                                .orElse(
+                                        // not sure why, this is a result 
tested by hand
+                                        IcebergPartitionField.FIRST_FIELD_ID - 
1),
+                        Collections.singletonList(snapshot),
+                        (int) snapshotId);
+        
table.fileIO().tryToWriteAtomic(pathFactory.toMetadataPath(snapshotId), 
metadata.toJson());
+        table.fileIO()
+                .overwriteFileUtf8(
+                        new Path(pathFactory.metadataDirectory(), 
VERSION_HINT_FILENAME),
+                        String.valueOf(snapshotId));
+    }
+
+    private List<IcebergDataFileMeta> dataSplitToDataFileMeta(DataSplit 
dataSplit) {
+        List<IcebergDataFileMeta> result = new ArrayList<>();
+        for (RawFile rawFile : dataSplit.convertToRawFiles().get()) {
+            result.add(
+                    new IcebergDataFileMeta(
+                            IcebergDataFileMeta.Content.DATA,
+                            rawFile.path(),
+                            rawFile.format(),
+                            dataSplit.partition(),
+                            rawFile.rowCount(),
+                            rawFile.fileSize()));
+        }
+        return result;
+    }
+
+    private List<IcebergPartitionField> getPartitionFields(RowType 
partitionType) {
+        List<IcebergPartitionField> result = new ArrayList<>();
+        int fieldId = IcebergPartitionField.FIRST_FIELD_ID;
+        for (DataField field : partitionType.getFields()) {
+            result.add(new IcebergPartitionField(field, fieldId));
+            fieldId++;
+        }
+        return result;
+    }
+
+    @Override
+    public void close() throws Exception {}
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java
new file mode 100644
index 000000000..8b5680952
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.PathFactory;
+
+import java.util.UUID;
+
+/** Path factory for Iceberg metadata files. */
+public class IcebergPathFactory {
+
+    private final Path root;
+    private final String uuid;
+
+    private int manifestFileCount;
+    private int manifestListCount;
+
+    public IcebergPathFactory(Path root) {
+        this.root = root;
+        this.uuid = UUID.randomUUID().toString();
+    }
+
+    public Path metadataDirectory() {
+        return new Path(root, "metadata");
+    }
+
+    public Path newManifestFile() {
+        manifestFileCount++;
+        return toManifestFilePath(uuid + "-m" + manifestFileCount + ".avro");
+    }
+
+    public Path toManifestFilePath(String manifestFileName) {
+        return new Path(metadataDirectory(), manifestFileName);
+    }
+
+    public Path newManifestListFile() {
+        manifestListCount++;
+        return toManifestListPath("snap-" + manifestListCount + "-" + uuid + 
".avro");
+    }
+
+    public Path toManifestListPath(String manifestListName) {
+        return new Path(metadataDirectory(), manifestListName);
+    }
+
+    public Path toMetadataPath(long snapshotId) {
+        return new Path(metadataDirectory(), 
String.format("v%d.metadata.json", snapshotId));
+    }
+
+    public PathFactory manifestFileFactory() {
+        return new PathFactory() {
+            @Override
+            public Path newPath() {
+                return newManifestFile();
+            }
+
+            @Override
+            public Path toPath(String fileName) {
+                return toManifestFilePath(fileName);
+            }
+        };
+    }
+
+    public PathFactory manifestListFactory() {
+        return new PathFactory() {
+            @Override
+            public Path newPath() {
+                return newManifestListFile();
+            }
+
+            @Override
+            public Path toPath(String fileName) {
+                return toManifestListPath(fileName);
+            }
+        };
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java
new file mode 100644
index 000000000..fb83dd52c
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.types.DataType;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Conversions between Java object and bytes.
+ *
+ * <p>See <a 
href="https://iceberg.apache.org/spec/#binary-single-value-serialization";>Iceberg
+ * spec</a>.
+ */
+public class IcebergConversions {
+
+    private IcebergConversions() {}
+
+    private static final ThreadLocal<CharsetEncoder> ENCODER =
+            ThreadLocal.withInitial(StandardCharsets.UTF_8::newEncoder);
+
+    public static ByteBuffer toByteBuffer(DataType type, Object value) {
+        switch (type.getTypeRoot()) {
+            case BOOLEAN:
+                return ByteBuffer.allocate(1).put(0, (Boolean) value ? (byte) 
0x01 : (byte) 0x00);
+            case INTEGER:
+            case DATE:
+                return 
ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(0, (int) value);
+            case BIGINT:
+                return ByteBuffer.allocate(8)
+                        .order(ByteOrder.LITTLE_ENDIAN)
+                        .putLong(0, (long) value);
+            case FLOAT:
+                return ByteBuffer.allocate(4)
+                        .order(ByteOrder.LITTLE_ENDIAN)
+                        .putFloat(0, (float) value);
+            case DOUBLE:
+                return ByteBuffer.allocate(8)
+                        .order(ByteOrder.LITTLE_ENDIAN)
+                        .putDouble(0, (double) value);
+            case CHAR:
+            case VARCHAR:
+                CharBuffer buffer = CharBuffer.wrap(value.toString());
+                try {
+                    return ENCODER.get().encode(buffer);
+                } catch (CharacterCodingException e) {
+                    throw new RuntimeException("Failed to encode value as 
UTF-8: " + value, e);
+                }
+            case BINARY:
+            case VARBINARY:
+                return ByteBuffer.wrap((byte[]) value);
+            case DECIMAL:
+                Decimal decimal = (Decimal) value;
+                return ByteBuffer.wrap((decimal.toUnscaledBytes()));
+            default:
+                throw new UnsupportedOperationException("Cannot serialize 
type: " + type);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
new file mode 100644
index 000000000..292d8488d
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Iceberg data file meta.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#manifests";>Iceberg 
spec</a>.
+ */
+public class IcebergDataFileMeta {
+
+    /** See Iceberg <code>data_file</code> struct <code>content</code> field. 
*/
+    public enum Content {
+        DATA(0),
+        POSITION_DELETES(1),
+        EQUALITY_DELETES(2);
+
+        private final int id;
+
+        Content(int id) {
+            this.id = id;
+        }
+
+        public int id() {
+            return id;
+        }
+
+        public static Content fromId(int id) {
+            switch (id) {
+                case 0:
+                    return DATA;
+                case 1:
+                    return POSITION_DELETES;
+                case 2:
+                    return EQUALITY_DELETES;
+            }
+            throw new IllegalArgumentException("Unknown manifest content: " + 
id);
+        }
+    }
+
+    private final Content content;
+    private final String filePath;
+    private final String fileFormat;
+    private final BinaryRow partition;
+    private final long recordCount;
+    private final long fileSizeInBytes;
+
+    public IcebergDataFileMeta(
+            Content content,
+            String filePath,
+            String fileFormat,
+            BinaryRow partition,
+            long recordCount,
+            long fileSizeInBytes) {
+        this.content = content;
+        this.filePath = filePath;
+        this.fileFormat = fileFormat;
+        this.partition = partition;
+        this.recordCount = recordCount;
+        this.fileSizeInBytes = fileSizeInBytes;
+    }
+
+    public Content content() {
+        return content;
+    }
+
+    public String filePath() {
+        return filePath;
+    }
+
+    public String fileFormat() {
+        return fileFormat;
+    }
+
+    public BinaryRow partition() {
+        return partition;
+    }
+
+    public long recordCount() {
+        return recordCount;
+    }
+
+    public long fileSizeInBytes() {
+        return fileSizeInBytes;
+    }
+
+    public static RowType schema(RowType partitionType) {
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(134, "content", DataTypes.INT().notNull()));
+        fields.add(new DataField(100, "file_path", 
DataTypes.STRING().notNull()));
+        fields.add(new DataField(101, "file_format", 
DataTypes.STRING().notNull()));
+        fields.add(new DataField(102, "partition", partitionType));
+        fields.add(new DataField(103, "record_count", 
DataTypes.BIGINT().notNull()));
+        fields.add(new DataField(104, "file_size_in_bytes", 
DataTypes.BIGINT().notNull()));
+        return new RowType(fields);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        IcebergDataFileMeta that = (IcebergDataFileMeta) o;
+        return content == that.content
+                && recordCount == that.recordCount
+                && fileSizeInBytes == that.fileSizeInBytes
+                && Objects.equals(filePath, that.filePath)
+                && Objects.equals(fileFormat, that.fileFormat)
+                && Objects.equals(partition, that.partition);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(content, filePath, fileFormat, partition, 
recordCount, fileSizeInBytes);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java
new file mode 100644
index 000000000..b4aa281e6
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ObjectSerializer;
+
+/** Serializer for {@link IcebergDataFileMeta}. */
+public class IcebergDataFileMetaSerializer extends 
ObjectSerializer<IcebergDataFileMeta> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final InternalRowSerializer partSerializer;
+
+    public IcebergDataFileMetaSerializer(RowType partitionType) {
+        super(IcebergDataFileMeta.schema(partitionType));
+        this.partSerializer = new InternalRowSerializer(partitionType);
+    }
+
+    @Override
+    public InternalRow toRow(IcebergDataFileMeta file) {
+        return GenericRow.of(
+                file.content().id(),
+                BinaryString.fromString(file.filePath()),
+                BinaryString.fromString(file.fileFormat()),
+                file.partition(),
+                file.recordCount(),
+                file.fileSizeInBytes());
+    }
+
+    @Override
+    public IcebergDataFileMeta fromRow(InternalRow row) {
+        return new IcebergDataFileMeta(
+                IcebergDataFileMeta.Content.fromId(row.getInt(0)),
+                row.getString(1).toString(),
+                row.getString(2).toString(),
+                partSerializer.toBinaryRow(row.getRow(3, 
partSerializer.getArity())).copy(),
+                row.getLong(4),
+                row.getLong(5));
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java
new file mode 100644
index 000000000..6b76fb757
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Entry of an Iceberg manifest file.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#manifests";>Iceberg 
spec</a>.
+ */
+public class IcebergManifestEntry {
+
+    /** See Iceberg <code>manifest_entry</code> struct <code>status</code> 
field. */
+    public enum Status {
+        EXISTING(0),
+        ADDED(1),
+        DELETED(2);
+
+        private final int id;
+
+        Status(int id) {
+            this.id = id;
+        }
+
+        public int id() {
+            return id;
+        }
+
+        public static Status fromId(int id) {
+            switch (id) {
+                case 0:
+                    return EXISTING;
+                case 1:
+                    return ADDED;
+                case 2:
+                    return DELETED;
+            }
+            throw new IllegalArgumentException("Unknown manifest content: " + 
id);
+        }
+    }
+
+    private final Status status;
+    private final long snapshotId;
+    // sequenceNumber indicates when the records in the data files are 
written. It might be smaller
+    // than fileSequenceNumber.
+    // For example, when a file with sequenceNumber 3 and another file with 
sequenceNumber 5 are
+    // compacted into one file during snapshot 6, the compacted file will have 
sequenceNumber =
+    // max(3, 5) = 5, and fileSequenceNumber = 6.
+    private final long sequenceNumber;
+    private final long fileSequenceNumber;
+    private final IcebergDataFileMeta dataFile;
+
+    public IcebergManifestEntry(
+            Status status,
+            long snapshotId,
+            long sequenceNumber,
+            long fileSequenceNumber,
+            IcebergDataFileMeta dataFile) {
+        this.status = status;
+        this.snapshotId = snapshotId;
+        this.sequenceNumber = sequenceNumber;
+        this.fileSequenceNumber = fileSequenceNumber;
+        this.dataFile = dataFile;
+    }
+
+    public Status status() {
+        return status;
+    }
+
+    public long snapshotId() {
+        return snapshotId;
+    }
+
+    public long sequenceNumber() {
+        return sequenceNumber;
+    }
+
+    public long fileSequenceNumber() {
+        return fileSequenceNumber;
+    }
+
+    public IcebergDataFileMeta file() {
+        return dataFile;
+    }
+
+    public static RowType schema(RowType partitionType) {
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(0, "status", DataTypes.INT().notNull()));
+        fields.add(new DataField(1, "snapshot_id", DataTypes.BIGINT()));
+        fields.add(new DataField(3, "sequence_number", DataTypes.BIGINT()));
+        fields.add(new DataField(4, "file_sequence_number", 
DataTypes.BIGINT()));
+        fields.add(
+                new DataField(2, "data_file", 
IcebergDataFileMeta.schema(partitionType).notNull()));
+        return new RowType(fields);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        IcebergManifestEntry that = (IcebergManifestEntry) o;
+        return status == that.status && Objects.equals(dataFile, 
that.dataFile);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(status, dataFile);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
new file mode 100644
index 000000000..d93456c3f
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ObjectSerializer;
+
+/** Serializer for {@link IcebergManifestEntry}. */
+public class IcebergManifestEntrySerializer extends 
ObjectSerializer<IcebergManifestEntry> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final IcebergDataFileMetaSerializer fileSerializer;
+
+    public IcebergManifestEntrySerializer(RowType partitionType) {
+        super(IcebergManifestEntry.schema(partitionType));
+        this.fileSerializer = new IcebergDataFileMetaSerializer(partitionType);
+    }
+
+    @Override
+    public InternalRow toRow(IcebergManifestEntry entry) {
+        return GenericRow.of(
+                entry.status().id(),
+                entry.snapshotId(),
+                entry.sequenceNumber(),
+                entry.fileSequenceNumber(),
+                fileSerializer.toRow(entry.file()));
+    }
+
+    @Override
+    public IcebergManifestEntry fromRow(InternalRow row) {
+        return new IcebergManifestEntry(
+                IcebergManifestEntry.Status.fromId(row.getInt(0)),
+                row.getLong(1),
+                row.getLong(2),
+                row.getLong(3),
+                fileSerializer.fromRow(row.getRow(4, 
fileSerializer.numFields())));
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
new file mode 100644
index 000000000..d4c363b4c
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SimpleColStats;
+import org.apache.paimon.format.SimpleStatsCollector;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta.Content;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.io.SingleFileWriter;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ObjectsFile;
+import org.apache.paimon.utils.PathFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.paimon.iceberg.manifest.IcebergConversions.toByteBuffer;
+
+/**
+ * This file includes several Iceberg {@link ManifestEntry}s, representing the 
additional changes
+ * since last snapshot.
+ */
+public class IcebergManifestFile extends ObjectsFile<IcebergManifestEntry> {
+
+    private static final long UNASSIGNED_SEQ = -1L;
+
+    private final RowType partitionType;
+    private final FormatWriterFactory writerFactory;
+    private final MemorySize targetFileSize;
+
+    public IcebergManifestFile(
+            FileIO fileIO,
+            RowType partitionType,
+            FormatReaderFactory readerFactory,
+            FormatWriterFactory writerFactory,
+            String compression,
+            PathFactory pathFactory,
+            MemorySize targetFileSize) {
+        super(
+                fileIO,
+                new IcebergManifestEntrySerializer(partitionType),
+                readerFactory,
+                writerFactory,
+                compression,
+                pathFactory,
+                null);
+        this.partitionType = partitionType;
+        this.writerFactory = writerFactory;
+        this.targetFileSize = targetFileSize;
+    }
+
+    public List<IcebergManifestFileMeta> rollingWrite(
+            Iterator<IcebergManifestEntry> entries, long sequenceNumber) 
throws IOException {
+        RollingFileWriter<IcebergManifestEntry, IcebergManifestFileMeta> 
writer =
+                new RollingFileWriter<>(
+                        () -> createWriter(sequenceNumber), 
targetFileSize.getBytes());
+        try {
+            writer.write(entries);
+            writer.close();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return writer.result();
+    }
+
+    public SingleFileWriter<IcebergManifestEntry, IcebergManifestFileMeta> 
createWriter(
+            long sequenceNumber) {
+        return new IcebergManifestEntryWriter(
+                writerFactory,
+                pathFactory.newPath(),
+                CoreOptions.FILE_COMPRESSION.defaultValue(),
+                sequenceNumber);
+    }
+
+    private class IcebergManifestEntryWriter
+            extends SingleFileWriter<IcebergManifestEntry, 
IcebergManifestFileMeta> {
+
+        private final SimpleStatsCollector partitionStatsCollector;
+        private final long sequenceNumber;
+
+        private int addedFilesCount = 0;
+        private int existingFilesCount = 0;
+        private int deletedFilesCount = 0;
+        private long addedRowsCount = 0;
+        private long existingRowsCount = 0;
+        private long deletedRowsCount = 0;
+        private Long minSequenceNumber = null;
+
+        IcebergManifestEntryWriter(
+                FormatWriterFactory factory,
+                Path path,
+                String fileCompression,
+                long sequenceNumber) {
+            super(
+                    IcebergManifestFile.this.fileIO,
+                    factory,
+                    path,
+                    serializer::toRow,
+                    fileCompression);
+            this.partitionStatsCollector = new 
SimpleStatsCollector(partitionType);
+            this.sequenceNumber = sequenceNumber;
+        }
+
+        @Override
+        public void write(IcebergManifestEntry entry) throws IOException {
+            super.write(entry);
+
+            switch (entry.status()) {
+                case ADDED:
+                    addedFilesCount += 1;
+                    addedRowsCount += entry.file().recordCount();
+                    break;
+                case EXISTING:
+                    existingFilesCount += 1;
+                    existingRowsCount += entry.file().recordCount();
+                    break;
+                case DELETED:
+                    deletedFilesCount += 1;
+                    deletedRowsCount += entry.file().recordCount();
+                    break;
+            }
+
+            if (minSequenceNumber == null || minSequenceNumber > 
entry.sequenceNumber()) {
+                minSequenceNumber = entry.sequenceNumber();
+            }
+
+            partitionStatsCollector.collect(entry.file().partition());
+        }
+
+        @Override
+        public IcebergManifestFileMeta result() throws IOException {
+            SimpleColStats[] stats = partitionStatsCollector.extract();
+            List<IcebergPartitionSummary> partitionSummaries = new 
ArrayList<>();
+            for (int i = 0; i < stats.length; i++) {
+                SimpleColStats fieldStats = stats[i];
+                DataType type = partitionType.getTypeAt(i);
+                partitionSummaries.add(
+                        new IcebergPartitionSummary(
+                                Objects.requireNonNull(fieldStats.nullCount()) 
> 0,
+                                false, // TODO correct it?
+                                toByteBuffer(type, fieldStats.min()).array(),
+                                toByteBuffer(type, fieldStats.max()).array()));
+            }
+            return new IcebergManifestFileMeta(
+                    path.toString(),
+                    fileIO.getFileSize(path),
+                    IcebergPartitionSpec.SPEC_ID,
+                    Content.DATA,
+                    sequenceNumber,
+                    minSequenceNumber != null ? minSequenceNumber : 
UNASSIGNED_SEQ,
+                    sequenceNumber,
+                    addedFilesCount,
+                    existingFilesCount,
+                    deletedFilesCount,
+                    addedRowsCount,
+                    existingRowsCount,
+                    deletedRowsCount,
+                    partitionSummaries);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java
new file mode 100644
index 000000000..571b24960
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Metadata of an Iceberg manifest file.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#manifest-lists";>Iceberg 
spec</a>.
+ */
+public class IcebergManifestFileMeta {
+
+    /** Content type stored in a manifest file. */
+    public enum Content {
+        DATA(0),
+        DELETES(1);
+
+        private final int id;
+
+        Content(int id) {
+            this.id = id;
+        }
+
+        public int id() {
+            return id;
+        }
+
+        public static Content fromId(int id) {
+            switch (id) {
+                case 0:
+                    return DATA;
+                case 1:
+                    return DELETES;
+            }
+            throw new IllegalArgumentException("Unknown manifest content: " + 
id);
+        }
+    }
+
+    private final String manifestPath;
+    private final long manifestLength;
+    private final int partitionSpecId;
+    private final Content content;
+    private final long sequenceNumber;
+    private final long minSequenceNumber;
+    private final long addedSnapshotId;
+    private final int addedFilesCount;
+    private final int existingFilesCount;
+    private final int deletedFilesCount;
+    private final long addedRowsCount;
+    private final long existingRowsCount;
+    private final long deletedRowsCount;
+    private final List<IcebergPartitionSummary> partitions;
+
+    public IcebergManifestFileMeta(
+            String manifestPath,
+            long manifestLength,
+            int partitionSpecId,
+            Content content,
+            long sequenceNumber,
+            long minSequenceNumber,
+            long addedSnapshotId,
+            int addedFilesCount,
+            int existingFilesCount,
+            int deletedFilesCount,
+            long addedRowsCount,
+            long existingRowsCount,
+            long deletedRowsCount,
+            List<IcebergPartitionSummary> partitions) {
+        this.manifestPath = manifestPath;
+        this.manifestLength = manifestLength;
+        this.partitionSpecId = partitionSpecId;
+        this.content = content;
+        this.sequenceNumber = sequenceNumber;
+        this.minSequenceNumber = minSequenceNumber;
+        this.addedSnapshotId = addedSnapshotId;
+        this.addedFilesCount = addedFilesCount;
+        this.existingFilesCount = existingFilesCount;
+        this.deletedFilesCount = deletedFilesCount;
+        this.addedRowsCount = addedRowsCount;
+        this.existingRowsCount = existingRowsCount;
+        this.deletedRowsCount = deletedRowsCount;
+        this.partitions = partitions;
+    }
+
+    public String manifestPath() {
+        return manifestPath;
+    }
+
+    public long manifestLength() {
+        return manifestLength;
+    }
+
+    public int partitionSpecId() {
+        return partitionSpecId;
+    }
+
+    public Content content() {
+        return content;
+    }
+
+    public long sequenceNumber() {
+        return sequenceNumber;
+    }
+
+    public long minSequenceNumber() {
+        return minSequenceNumber;
+    }
+
+    public long addedSnapshotId() {
+        return addedSnapshotId;
+    }
+
+    public int addedFilesCount() {
+        return addedFilesCount;
+    }
+
+    public int existingFilesCount() {
+        return existingFilesCount;
+    }
+
+    public int deletedFilesCount() {
+        return deletedFilesCount;
+    }
+
+    public long addedRowsCount() {
+        return addedRowsCount;
+    }
+
+    public long existingRowsCount() {
+        return existingRowsCount;
+    }
+
+    public long deletedRowsCount() {
+        return deletedRowsCount;
+    }
+
+    public List<IcebergPartitionSummary> partitions() {
+        return partitions;
+    }
+
+    public static RowType schema() {
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(500, "manifest_path", 
DataTypes.STRING().notNull()));
+        fields.add(new DataField(501, "manifest_length", 
DataTypes.BIGINT().notNull()));
+        fields.add(new DataField(502, "partition_spec_id", 
DataTypes.INT().notNull()));
+        fields.add(new DataField(517, "content", DataTypes.INT().notNull()));
+        fields.add(new DataField(515, "sequence_number", 
DataTypes.BIGINT().notNull()));
+        fields.add(new DataField(516, "min_sequence_number", 
DataTypes.BIGINT().notNull()));
+        fields.add(new DataField(503, "added_snapshot_id", 
DataTypes.BIGINT()));
+        fields.add(new DataField(504, "added_files_count", 
DataTypes.INT().notNull()));
+        fields.add(new DataField(505, "existing_files_count", 
DataTypes.INT().notNull()));
+        fields.add(new DataField(506, "deleted_files_count", 
DataTypes.INT().notNull()));
+        fields.add(new DataField(512, "added_rows_count", 
DataTypes.BIGINT().notNull()));
+        fields.add(new DataField(513, "existing_rows_count", 
DataTypes.BIGINT().notNull()));
+        fields.add(new DataField(514, "deleted_rows_count", 
DataTypes.BIGINT().notNull()));
+        fields.add(
+                new DataField(
+                        508, "partitions", 
DataTypes.ARRAY(IcebergPartitionSummary.schema())));
+        return new RowType(fields);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        IcebergManifestFileMeta that = (IcebergManifestFileMeta) o;
+        return Objects.equals(manifestPath, that.manifestPath)
+                && manifestLength == that.manifestLength
+                && partitionSpecId == that.partitionSpecId
+                && content == that.content
+                && sequenceNumber == that.sequenceNumber
+                && minSequenceNumber == that.minSequenceNumber
+                && addedSnapshotId == that.addedSnapshotId
+                && addedFilesCount == that.addedFilesCount
+                && existingFilesCount == that.existingFilesCount
+                && deletedFilesCount == that.deletedFilesCount
+                && addedRowsCount == that.addedRowsCount
+                && existingRowsCount == that.existingRowsCount
+                && deletedRowsCount == that.deletedRowsCount
+                && Objects.equals(partitions, that.partitions);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                manifestPath,
+                manifestLength,
+                partitionSpecId,
+                content,
+                sequenceNumber,
+                minSequenceNumber,
+                addedSnapshotId,
+                addedFilesCount,
+                existingFilesCount,
+                deletedFilesCount,
+                addedRowsCount,
+                existingRowsCount,
+                deletedRowsCount,
+                partitions);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java
new file mode 100644
index 000000000..c40a26e8f
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta.Content;
+import org.apache.paimon.utils.ObjectSerializer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Serializer for {@link IcebergManifestFileMeta}. */
+public class IcebergManifestFileMetaSerializer extends 
ObjectSerializer<IcebergManifestFileMeta> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final IcebergPartitionSummarySerializer partitionSummarySerializer;
+
+    public IcebergManifestFileMetaSerializer() {
+        super(IcebergManifestFileMeta.schema());
+        this.partitionSummarySerializer = new 
IcebergPartitionSummarySerializer();
+    }
+
+    @Override
+    public InternalRow toRow(IcebergManifestFileMeta file) {
+        return GenericRow.of(
+                BinaryString.fromString(file.manifestPath()),
+                file.manifestLength(),
+                file.partitionSpecId(),
+                file.content().id(),
+                file.sequenceNumber(),
+                file.minSequenceNumber(),
+                file.addedSnapshotId(),
+                file.addedFilesCount(),
+                file.existingFilesCount(),
+                file.deletedFilesCount(),
+                file.addedRowsCount(),
+                file.existingRowsCount(),
+                file.deletedRowsCount(),
+                new GenericArray(
+                        file.partitions().stream()
+                                .map(partitionSummarySerializer::toRow)
+                                .toArray(InternalRow[]::new)));
+    }
+
+    @Override
+    public IcebergManifestFileMeta fromRow(InternalRow row) {
+        return new IcebergManifestFileMeta(
+                row.getString(0).toString(),
+                row.getLong(1),
+                row.getInt(2),
+                Content.fromId(row.getInt(3)),
+                row.getLong(4),
+                row.getLong(5),
+                row.getLong(6),
+                row.getInt(7),
+                row.getInt(8),
+                row.getInt(9),
+                row.getLong(10),
+                row.getLong(11),
+                row.getLong(12),
+                toPartitionSummaries(row.getArray(13)));
+    }
+
+    private List<IcebergPartitionSummary> toPartitionSummaries(InternalArray 
array) {
+        List<IcebergPartitionSummary> summaries = new ArrayList<>();
+        for (int i = 0; i < array.size(); i++) {
+            summaries.add(
+                    partitionSummarySerializer.fromRow(
+                            array.getRow(i, 
partitionSummarySerializer.numFields())));
+        }
+        return summaries;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
new file mode 100644
index 000000000..e247b0238
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.utils.ObjectsFile;
+import org.apache.paimon.utils.PathFactory;
+
+/**
+ * This file includes several Iceberg {@link IcebergManifestFileMeta}s, 
representing the additional
+ * changes since last snapshot.
+ */
+public class IcebergManifestList extends ObjectsFile<IcebergManifestFileMeta> {
+
+    public IcebergManifestList(
+            FileIO fileIO,
+            FormatReaderFactory readerFactory,
+            FormatWriterFactory writerFactory,
+            String compression,
+            PathFactory pathFactory) {
+        super(
+                fileIO,
+                new IcebergManifestFileMetaSerializer(),
+                readerFactory,
+                writerFactory,
+                compression,
+                pathFactory,
+                null);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummary.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummary.java
new file mode 100644
index 000000000..e47a8fe00
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummary.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Iceberg partition summary stored in manifest file.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#manifest-lists";>Iceberg 
spec</a>.
+ */
+public class IcebergPartitionSummary {
+
+    private final boolean containsNull;
+    private final boolean containsNan;
+    private final byte[] lowerBound;
+    private final byte[] upperBound;
+
+    public IcebergPartitionSummary(
+            boolean containsNull, boolean containsNan, byte[] lowerBound, 
byte[] upperBound) {
+        this.containsNull = containsNull;
+        this.containsNan = containsNan;
+        this.lowerBound = lowerBound;
+        this.upperBound = upperBound;
+    }
+
+    public boolean containsNull() {
+        return containsNull;
+    }
+
+    public boolean containsNan() {
+        return containsNan;
+    }
+
+    public byte[] lowerBound() {
+        return lowerBound;
+    }
+
+    public byte[] upperBound() {
+        return upperBound;
+    }
+
+    public static RowType schema() {
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(509, "contains_null", 
DataTypes.BOOLEAN().notNull()));
+        fields.add(new DataField(518, "contains_nan", DataTypes.BOOLEAN()));
+        fields.add(new DataField(510, "lower_bound", DataTypes.BYTES()));
+        fields.add(new DataField(511, "upper_bound", DataTypes.BYTES()));
+        return (RowType) new RowType(fields).notNull();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        IcebergPartitionSummary that = (IcebergPartitionSummary) o;
+        return containsNull == that.containsNull
+                && containsNan == that.containsNan
+                && Arrays.equals(lowerBound, that.lowerBound)
+                && Arrays.equals(upperBound, that.upperBound);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(containsNull, containsNan);
+        result = 31 * result + Arrays.hashCode(lowerBound);
+        result = 31 * result + Arrays.hashCode(upperBound);
+        return result;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummarySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummarySerializer.java
new file mode 100644
index 000000000..2ef6ceb1c
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummarySerializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.ObjectSerializer;
+
+/** Serializer for {@link IcebergPartitionSummary}. */
+public class IcebergPartitionSummarySerializer extends 
ObjectSerializer<IcebergPartitionSummary> {
+
+    public IcebergPartitionSummarySerializer() {
+        super(IcebergPartitionSummary.schema());
+    }
+
+    @Override
+    public InternalRow toRow(IcebergPartitionSummary record) {
+        return GenericRow.of(
+                record.containsNull(),
+                record.containsNan(),
+                record.lowerBound(),
+                record.upperBound());
+    }
+
+    @Override
+    public IcebergPartitionSummary fromRow(InternalRow row) {
+        return new IcebergPartitionSummary(
+                row.getBoolean(0), row.getBoolean(1), row.getBinary(2), 
row.getBinary(3));
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
new file mode 100644
index 000000000..fd05183b6
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DecimalType;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * {@link DataField} in Iceberg.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#schemas";>Iceberg spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergDataField {
+
+    private static final String FIELD_ID = "id";
+    private static final String FIELD_NAME = "name";
+    private static final String FIELD_REQUIRED = "required";
+    private static final String FIELD_TYPE = "type";
+    private static final String FIELD_DOC = "doc";
+
+    @JsonProperty(FIELD_ID)
+    private final int id;
+
+    @JsonProperty(FIELD_NAME)
+    private final String name;
+
+    @JsonProperty(FIELD_REQUIRED)
+    private final boolean required;
+
+    @JsonProperty(FIELD_TYPE)
+    private final String type;
+
+    @JsonProperty(FIELD_DOC)
+    private final String doc;
+
+    public IcebergDataField(DataField dataField) {
+        this(
+                dataField.id(),
+                dataField.name(),
+                !dataField.type().isNullable(),
+                toTypeString(dataField.type()),
+                dataField.description());
+    }
+
+    @JsonCreator
+    public IcebergDataField(
+            @JsonProperty(FIELD_ID) int id,
+            @JsonProperty(FIELD_NAME) String name,
+            @JsonProperty(FIELD_REQUIRED) boolean required,
+            @JsonProperty(FIELD_TYPE) String type,
+            @JsonProperty(FIELD_DOC) String doc) {
+        this.id = id;
+        this.name = name;
+        this.required = required;
+        this.type = type;
+        this.doc = doc;
+    }
+
+    @JsonGetter(FIELD_ID)
+    public int id() {
+        return id;
+    }
+
+    @JsonGetter(FIELD_NAME)
+    public String name() {
+        return name;
+    }
+
+    @JsonGetter(FIELD_REQUIRED)
+    public boolean required() {
+        return required;
+    }
+
+    @JsonGetter(FIELD_TYPE)
+    public String type() {
+        return type;
+    }
+
+    @JsonGetter(FIELD_DOC)
+    public String doc() {
+        return doc;
+    }
+
+    private static String toTypeString(DataType dataType) {
+        switch (dataType.getTypeRoot()) {
+            case BOOLEAN:
+                return "boolean";
+            case INTEGER:
+                return "int";
+            case BIGINT:
+                return "long";
+            case FLOAT:
+                return "float";
+            case DOUBLE:
+                return "double";
+            case DATE:
+                return "date";
+            case CHAR:
+            case VARCHAR:
+                return "string";
+            case BINARY:
+            case VARBINARY:
+                return "binary";
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) dataType;
+                return String.format(
+                        "decimal(%d, %d)", decimalType.getPrecision(), 
decimalType.getScale());
+            default:
+                throw new UnsupportedOperationException("Unsupported data 
type: " + dataType);
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, name, required, type, doc);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof IcebergDataField)) {
+            return false;
+        }
+
+        IcebergDataField that = (IcebergDataField) o;
+        return id == that.id
+                && Objects.equals(name, that.name)
+                && required == that.required
+                && Objects.equals(type, that.type)
+                && Objects.equals(doc, that.doc);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
new file mode 100644
index 000000000..a3af25a1c
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Iceberg's metadata json file.
+ *
+ * <p>See <a 
href="https://iceberg.apache.org/spec/#table-metadata-fields";>Iceberg spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergMetadata {
+
+    public static final int CURRENT_FORMAT_VERSION = 2;
+
+    private static final String FIELD_FORMAT_VERSION = "format-version";
+    private static final String FIELD_TABLE_UUID = "table-uuid";
+    private static final String FIELD_LOCATION = "location";
+    private static final String FIELD_LAST_SEQUENCE_NUMBER = 
"last-sequence-number";
+    private static final String FIELD_LAST_UPDATED_MS = "last-updated-ms";
+    private static final String FIELD_LAST_COLUMN_ID = "last-column-id";
+    private static final String FIELD_SCHEMAS = "schemas";
+    private static final String FIELD_CURRENT_SCHEMA_ID = "current-schema-id";
+    private static final String FIELD_PARTITION_SPECS = "partition-specs";
+    private static final String FIELD_DEFAULT_SPEC_ID = "default-spec-id";
+    private static final String FIELD_LAST_PARTITION_ID = "last-partition-id";
+    private static final String FIELD_SORT_ORDERS = "sort-orders";
+    private static final String FIELD_DEFAULT_SORT_ORDER_ID = 
"default-sort-order-id";
+    private static final String FIELD_SNAPSHOTS = "snapshots";
+    private static final String FIELD_CURRENT_SNAPSHOT_ID = 
"current-snapshot-id";
+
+    @JsonProperty(FIELD_FORMAT_VERSION)
+    private final int formatVersion;
+
+    @JsonProperty(FIELD_TABLE_UUID)
+    private final String tableUuid;
+
+    @JsonProperty(FIELD_LOCATION)
+    private final String location;
+
+    @JsonProperty(FIELD_LAST_SEQUENCE_NUMBER)
+    private final long lastSequenceNumber;
+
+    @JsonProperty(FIELD_LAST_UPDATED_MS)
+    private final long lastUpdatedMs;
+
+    @JsonProperty(FIELD_LAST_COLUMN_ID)
+    private final int lastColumnId;
+
+    @JsonProperty(FIELD_SCHEMAS)
+    private final List<IcebergSchema> schemas;
+
+    @JsonProperty(FIELD_CURRENT_SCHEMA_ID)
+    private final int currentSchemaId;
+
+    @JsonProperty(FIELD_PARTITION_SPECS)
+    private final List<IcebergPartitionSpec> partitionSpecs;
+
+    @JsonProperty(FIELD_DEFAULT_SPEC_ID)
+    private final int defaultSpecId;
+
+    @JsonProperty(FIELD_LAST_PARTITION_ID)
+    private final int lastPartitionId;
+
+    @JsonProperty(FIELD_SORT_ORDERS)
+    private final List<IcebergSortOrder> sortOrders;
+
+    @JsonProperty(FIELD_DEFAULT_SORT_ORDER_ID)
+    private final int defaultSortOrderId;
+
+    @JsonProperty(FIELD_SNAPSHOTS)
+    private final List<IcebergSnapshot> snapshots;
+
+    @JsonProperty(FIELD_CURRENT_SNAPSHOT_ID)
+    private final int currentSnapshotId;
+
+    public IcebergMetadata(
+            String tableUuid,
+            String location,
+            long lastSequenceNumber,
+            int lastColumnId,
+            List<IcebergSchema> schemas,
+            int currentSchemaId,
+            List<IcebergPartitionSpec> partitionSpecs,
+            int lastPartitionId,
+            List<IcebergSnapshot> snapshots,
+            int currentSnapshotId) {
+        this(
+                CURRENT_FORMAT_VERSION,
+                tableUuid,
+                location,
+                lastSequenceNumber,
+                System.currentTimeMillis(),
+                lastColumnId,
+                schemas,
+                currentSchemaId,
+                partitionSpecs,
+                IcebergPartitionSpec.SPEC_ID,
+                lastPartitionId,
+                Collections.singletonList(new IcebergSortOrder()),
+                IcebergSortOrder.ORDER_ID,
+                snapshots,
+                currentSnapshotId);
+    }
+
+    @JsonCreator
+    public IcebergMetadata(
+            @JsonProperty(FIELD_FORMAT_VERSION) int formatVersion,
+            @JsonProperty(FIELD_TABLE_UUID) String tableUuid,
+            @JsonProperty(FIELD_LOCATION) String location,
+            @JsonProperty(FIELD_LAST_SEQUENCE_NUMBER) long lastSequenceNumber,
+            @JsonProperty(FIELD_LAST_UPDATED_MS) long lastUpdatedMs,
+            @JsonProperty(FIELD_LAST_COLUMN_ID) int lastColumnId,
+            @JsonProperty(FIELD_SCHEMAS) List<IcebergSchema> schemas,
+            @JsonProperty(FIELD_CURRENT_SCHEMA_ID) int currentSchemaId,
+            @JsonProperty(FIELD_PARTITION_SPECS) List<IcebergPartitionSpec> 
partitionSpecs,
+            @JsonProperty(FIELD_DEFAULT_SPEC_ID) int defaultSpecId,
+            @JsonProperty(FIELD_LAST_PARTITION_ID) int lastPartitionId,
+            @JsonProperty(FIELD_SORT_ORDERS) List<IcebergSortOrder> sortOrders,
+            @JsonProperty(FIELD_DEFAULT_SORT_ORDER_ID) int defaultSortOrderId,
+            @JsonProperty(FIELD_SNAPSHOTS) List<IcebergSnapshot> snapshots,
+            @JsonProperty(FIELD_CURRENT_SNAPSHOT_ID) int currentSnapshotId) {
+        this.formatVersion = formatVersion;
+        this.tableUuid = tableUuid;
+        this.location = location;
+        this.lastSequenceNumber = lastSequenceNumber;
+        this.lastUpdatedMs = lastUpdatedMs;
+        this.lastColumnId = lastColumnId;
+        this.schemas = schemas;
+        this.currentSchemaId = currentSchemaId;
+        this.partitionSpecs = partitionSpecs;
+        this.defaultSpecId = defaultSpecId;
+        this.lastPartitionId = lastPartitionId;
+        this.sortOrders = sortOrders;
+        this.defaultSortOrderId = defaultSortOrderId;
+        this.snapshots = snapshots;
+        this.currentSnapshotId = currentSnapshotId;
+    }
+
+    @JsonGetter(FIELD_FORMAT_VERSION)
+    public int formatVersion() {
+        return formatVersion;
+    }
+
+    @JsonGetter(FIELD_TABLE_UUID)
+    public String tableUuid() {
+        return tableUuid;
+    }
+
+    @JsonGetter(FIELD_LOCATION)
+    public String location() {
+        return location;
+    }
+
+    @JsonGetter(FIELD_LAST_SEQUENCE_NUMBER)
+    public long lastSequenceNumber() {
+        return lastSequenceNumber;
+    }
+
+    @JsonGetter(FIELD_LAST_UPDATED_MS)
+    public long lastUpdatedMs() {
+        return lastUpdatedMs;
+    }
+
+    @JsonGetter(FIELD_LAST_COLUMN_ID)
+    public int lastColumnId() {
+        return lastColumnId;
+    }
+
+    @JsonGetter(FIELD_SCHEMAS)
+    public List<IcebergSchema> schemas() {
+        return schemas;
+    }
+
+    @JsonGetter(FIELD_CURRENT_SCHEMA_ID)
+    public int currentSchemaId() {
+        return currentSchemaId;
+    }
+
+    @JsonGetter(FIELD_PARTITION_SPECS)
+    public List<IcebergPartitionSpec> partitionSpecs() {
+        return partitionSpecs;
+    }
+
+    @JsonGetter(FIELD_DEFAULT_SPEC_ID)
+    public int defaultSpecId() {
+        return defaultSpecId;
+    }
+
+    @JsonGetter(FIELD_LAST_PARTITION_ID)
+    public int lastPartitionId() {
+        return lastPartitionId;
+    }
+
+    @JsonGetter(FIELD_SORT_ORDERS)
+    public List<IcebergSortOrder> sortOrders() {
+        return sortOrders;
+    }
+
+    @JsonGetter(FIELD_DEFAULT_SORT_ORDER_ID)
+    public int defaultSortOrderId() {
+        return defaultSortOrderId;
+    }
+
+    @JsonGetter(FIELD_SNAPSHOTS)
+    public List<IcebergSnapshot> snapshots() {
+        return snapshots;
+    }
+
+    @JsonGetter(FIELD_CURRENT_SNAPSHOT_ID)
+    public int currentSnapshotId() {
+        return currentSnapshotId;
+    }
+
+    public IcebergSnapshot currentSnapshot() {
+        for (IcebergSnapshot snapshot : snapshots) {
+            if (snapshot.snapshotId() == currentSnapshotId) {
+                return snapshot;
+            }
+        }
+        throw new RuntimeException(
+                "Cannot find snapshot with id " + currentSnapshotId + ", this 
is unexpected.");
+    }
+
+    public String toJson() {
+        return JsonSerdeUtil.toJson(this);
+    }
+
+    public static IcebergMetadata fromJson(String json) {
+        return JsonSerdeUtil.fromJson(json, IcebergMetadata.class);
+    }
+
+    public static IcebergMetadata fromPath(FileIO fileIO, Path path) {
+        try {
+            String json = fileIO.readFileUtf8(path);
+            return fromJson(json);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to read Iceberg metadata from 
path " + path, e);
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                formatVersion,
+                tableUuid,
+                location,
+                lastSequenceNumber,
+                lastUpdatedMs,
+                lastColumnId,
+                schemas,
+                currentSchemaId,
+                partitionSpecs,
+                defaultSpecId,
+                lastPartitionId,
+                sortOrders,
+                defaultSortOrderId,
+                snapshots,
+                currentSnapshotId);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof IcebergMetadata)) {
+            return false;
+        }
+
+        IcebergMetadata that = (IcebergMetadata) o;
+        return formatVersion == that.formatVersion
+                && Objects.equals(tableUuid, that.tableUuid)
+                && Objects.equals(location, that.location)
+                && lastSequenceNumber == that.lastSequenceNumber
+                && lastUpdatedMs == that.lastUpdatedMs
+                && lastColumnId == that.lastColumnId
+                && Objects.equals(schemas, that.schemas)
+                && currentSchemaId == that.currentSchemaId
+                && Objects.equals(partitionSpecs, that.partitionSpecs)
+                && defaultSpecId == that.defaultSpecId
+                && lastPartitionId == that.lastPartitionId
+                && Objects.equals(sortOrders, that.sortOrders)
+                && defaultSortOrderId == that.defaultSortOrderId
+                && Objects.equals(snapshots, that.snapshots)
+                && currentSnapshotId == that.currentSnapshotId;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java
new file mode 100644
index 000000000..7be0d0493
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import org.apache.paimon.types.DataField;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Partition field in Iceberg's partition spec.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#partition-specs";>Iceberg 
spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergPartitionField {
+
+    // not sure why, but the sample in Iceberg spec is like this
+    public static final int FIRST_FIELD_ID = 1000;
+
+    private static final String FIELD_NAME = "name";
+    private static final String FIELD_TRANSFORM = "transform";
+    private static final String FIELD_SOURCE_ID = "source-id";
+    private static final String FIELD_FIELD_ID = "field-id";
+
+    @JsonProperty(FIELD_NAME)
+    private final String name;
+
+    @JsonProperty(FIELD_TRANSFORM)
+    private final String transform;
+
+    @JsonProperty(FIELD_SOURCE_ID)
+    private final int sourceId;
+
+    @JsonProperty(FIELD_FIELD_ID)
+    private final int fieldId;
+
+    public IcebergPartitionField(DataField dataField, int fieldId) {
+        this(
+                dataField.name(),
+                // currently Paimon's partition value does not have any 
transformation
+                "identity",
+                dataField.id(),
+                fieldId);
+    }
+
+    @JsonCreator
+    public IcebergPartitionField(
+            @JsonProperty(FIELD_NAME) String name,
+            @JsonProperty(FIELD_TRANSFORM) String transform,
+            @JsonProperty(FIELD_SOURCE_ID) int sourceId,
+            @JsonProperty(FIELD_FIELD_ID) int fieldId) {
+        this.name = name;
+        this.transform = transform;
+        this.sourceId = sourceId;
+        this.fieldId = fieldId;
+    }
+
+    @JsonGetter(FIELD_NAME)
+    public String name() {
+        return name;
+    }
+
+    @JsonGetter(FIELD_TRANSFORM)
+    public String transform() {
+        return transform;
+    }
+
+    @JsonGetter(FIELD_SOURCE_ID)
+    public int sourceId() {
+        return sourceId;
+    }
+
+    @JsonGetter(FIELD_FIELD_ID)
+    public int fieldId() {
+        return fieldId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, transform, sourceId, fieldId);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof IcebergPartitionField)) {
+            return false;
+        }
+
+        IcebergPartitionField that = (IcebergPartitionField) o;
+        return Objects.equals(name, that.name)
+                && Objects.equals(transform, that.transform)
+                && sourceId == that.sourceId
+                && fieldId == that.fieldId;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionSpec.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionSpec.java
new file mode 100644
index 000000000..343a8c769
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionSpec.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Partition spec in Iceberg's metadata.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#partition-specs";>Iceberg 
spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergPartitionSpec {
+
+    // always 0, Paimon does not support partition evolution
+    public static final int SPEC_ID = 0;
+
+    private static final String FIELD_SPEC_ID = "spec-id";
+    private static final String FIELD_FIELDS = "fields";
+
+    @JsonProperty(FIELD_SPEC_ID)
+    private final int specId;
+
+    @JsonProperty(FIELD_FIELDS)
+    private final List<IcebergPartitionField> fields;
+
+    public IcebergPartitionSpec(List<IcebergPartitionField> fields) {
+        this(SPEC_ID, fields);
+    }
+
+    @JsonCreator
+    public IcebergPartitionSpec(
+            @JsonProperty(FIELD_SPEC_ID) int specId,
+            @JsonProperty(FIELD_FIELDS) List<IcebergPartitionField> fields) {
+        this.specId = specId;
+        this.fields = fields;
+    }
+
+    @JsonGetter(FIELD_SPEC_ID)
+    public int specId() {
+        return specId;
+    }
+
+    @JsonGetter(FIELD_FIELDS)
+    public List<IcebergPartitionField> fields() {
+        return fields;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(specId, fields);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof IcebergPartitionSpec)) {
+            return false;
+        }
+
+        IcebergPartitionSpec that = (IcebergPartitionSpec) o;
+        return specId == that.specId && Objects.equals(fields, that.fields);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java
new file mode 100644
index 000000000..b3c82021e
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import org.apache.paimon.schema.TableSchema;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Schema in Iceberg's metadata.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#schemas";>Iceberg spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergSchema {
+
+    private static final String FIELD_TYPE = "type";
+    private static final String FIELD_SCHEMA_ID = "schema-id";
+    private static final String FIELD_FIELDS = "fields";
+
+    @JsonProperty(FIELD_TYPE)
+    private final String type;
+
+    @JsonProperty(FIELD_SCHEMA_ID)
+    private final int schemaId;
+
+    @JsonProperty(FIELD_FIELDS)
+    private final List<IcebergDataField> fields;
+
+    public IcebergSchema(TableSchema tableSchema) {
+        this(
+                (int) tableSchema.id(),
+                tableSchema.fields().stream()
+                        .map(IcebergDataField::new)
+                        .collect(Collectors.toList()));
+    }
+
+    public IcebergSchema(int schemaId, List<IcebergDataField> fields) {
+        this("struct", schemaId, fields);
+    }
+
+    @JsonCreator
+    public IcebergSchema(
+            @JsonProperty(FIELD_TYPE) String type,
+            @JsonProperty(FIELD_SCHEMA_ID) int schemaId,
+            @JsonProperty(FIELD_FIELDS) List<IcebergDataField> fields) {
+        this.type = type;
+        this.schemaId = schemaId;
+        this.fields = fields;
+    }
+
+    @JsonGetter(FIELD_TYPE)
+    public String type() {
+        return type;
+    }
+
+    @JsonGetter(FIELD_SCHEMA_ID)
+    public int schemaId() {
+        return schemaId;
+    }
+
+    @JsonGetter(FIELD_FIELDS)
+    public List<IcebergDataField> fields() {
+        return fields;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(type, schemaId, fields);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof IcebergSchema)) {
+            return false;
+        }
+
+        IcebergSchema that = (IcebergSchema) o;
+        return Objects.equals(type, that.type)
+                && schemaId == that.schemaId
+                && Objects.equals(fields, that.fields);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java
new file mode 100644
index 000000000..df0224d22
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Snapshot in Iceberg's metadata.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#snapshots";>Iceberg 
spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergSnapshot {
+
+    private static final String FIELD_SEQUENCE_NUMBER = "sequence-number";
+    private static final String FIELD_SNAPSHOT_ID = "snapshot-id";
+    private static final String FIELD_TIMESTAMP_MS = "timestamp-ms";
+    private static final String FIELD_SUMMARY = "summary";
+    private static final String FIELD_MANIFEST_LIST = "manifest-list";
+    private static final String FIELD_SCHEMA_ID = "schema-id";
+
+    @JsonProperty(FIELD_SEQUENCE_NUMBER)
+    private final long sequenceNumber;
+
+    @JsonProperty(FIELD_SNAPSHOT_ID)
+    private final long snapshotId;
+
+    @JsonProperty(FIELD_TIMESTAMP_MS)
+    private final long timestampMs;
+
+    @JsonProperty(FIELD_SUMMARY)
+    private final IcebergSnapshotSummary summary;
+
+    @JsonProperty(FIELD_MANIFEST_LIST)
+    private final String manifestList;
+
+    @JsonProperty(FIELD_SCHEMA_ID)
+    private final int schemaId;
+
+    @JsonCreator
+    public IcebergSnapshot(
+            @JsonProperty(FIELD_SEQUENCE_NUMBER) long sequenceNumber,
+            @JsonProperty(FIELD_SNAPSHOT_ID) long snapshotId,
+            @JsonProperty(FIELD_TIMESTAMP_MS) long timestampMs,
+            @JsonProperty(FIELD_SUMMARY) IcebergSnapshotSummary summary,
+            @JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
+            @JsonProperty(FIELD_SCHEMA_ID) int schemaId) {
+        this.sequenceNumber = sequenceNumber;
+        this.snapshotId = snapshotId;
+        this.timestampMs = timestampMs;
+        this.summary = summary;
+        this.manifestList = manifestList;
+        this.schemaId = schemaId;
+    }
+
+    @JsonGetter(FIELD_SEQUENCE_NUMBER)
+    public long sequenceNumber() {
+        return sequenceNumber;
+    }
+
+    @JsonGetter(FIELD_SNAPSHOT_ID)
+    public long snapshotId() {
+        return snapshotId;
+    }
+
+    @JsonGetter(FIELD_TIMESTAMP_MS)
+    public long timestampMs() {
+        return timestampMs;
+    }
+
+    @JsonGetter(FIELD_SUMMARY)
+    public IcebergSnapshotSummary summary() {
+        return summary;
+    }
+
+    @JsonGetter(FIELD_MANIFEST_LIST)
+    public String manifestList() {
+        return manifestList;
+    }
+
+    @JsonGetter(FIELD_SCHEMA_ID)
+    public int schemaId() {
+        return schemaId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                sequenceNumber, snapshotId, timestampMs, summary, 
manifestList, schemaId);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof IcebergSnapshot)) {
+            return false;
+        }
+
+        IcebergSnapshot that = (IcebergSnapshot) o;
+        return sequenceNumber == that.sequenceNumber
+                && snapshotId == that.snapshotId
+                && timestampMs == that.timestampMs
+                && Objects.equals(summary, that.summary)
+                && Objects.equals(manifestList, that.manifestList)
+                && schemaId == that.schemaId;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java
new file mode 100644
index 000000000..0c70331ee
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Snapshot summary in Iceberg's snapshot.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#snapshots";>Iceberg 
spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergSnapshotSummary {
+
+    private static final String FIELD_OPERATION = "operation";
+
+    public static final String OPERATION_APPEND = "append";
+    public static final String OPERATION_OVERWRITE = "overwrite";
+
+    @JsonProperty(FIELD_OPERATION)
+    private final String operation;
+
+    @JsonCreator
+    public IcebergSnapshotSummary(@JsonProperty(FIELD_OPERATION) String 
operation) {
+        this.operation = operation;
+    }
+
+    @JsonGetter(FIELD_OPERATION)
+    public String operation() {
+        return operation;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(operation);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof IcebergSnapshotSummary)) {
+            return false;
+        }
+
+        IcebergSnapshotSummary that = (IcebergSnapshotSummary) o;
+        return Objects.equals(operation, that.operation);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSortOrder.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSortOrder.java
new file mode 100644
index 000000000..0ff867006
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSortOrder.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Sort order in Iceberg's metadata.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#sort-orders";>Iceberg 
spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergSortOrder {
+
+    // currently unsupported
+    public static final int ORDER_ID = 0;
+
+    private static final String FIELD_ORDER_ID = "order-id";
+    private static final String FIELD_FIELDS = "fields";
+
+    @JsonProperty(FIELD_ORDER_ID)
+    private final int orderId;
+
+    // currently always empty
+    @JsonProperty(FIELD_FIELDS)
+    private final List<Object> fields;
+
+    public IcebergSortOrder() {
+        this(ORDER_ID, new ArrayList<>());
+    }
+
+    @JsonCreator
+    public IcebergSortOrder(
+            @JsonProperty(FIELD_ORDER_ID) int orderId,
+            @JsonProperty(FIELD_FIELDS) List<Object> fields) {
+        this.orderId = orderId;
+        this.fields = fields;
+    }
+
+    @JsonGetter(FIELD_ORDER_ID)
+    public int orderId() {
+        return orderId;
+    }
+
+    @JsonGetter(FIELD_FIELDS)
+    public List<Object> fields() {
+        return fields;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(orderId, fields);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof IcebergSortOrder)) {
+            return false;
+        }
+
+        IcebergSortOrder that = (IcebergSortOrder) o;
+        return orderId == that.orderId && Objects.equals(fields, that.fields);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 108454bb7..3839896eb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.IcebergCommitCallback;
 import org.apache.paimon.metastore.AddPartitionCommitCallback;
 import org.apache.paimon.metastore.AddPartitionTagCallback;
 import org.apache.paimon.metastore.MetastoreClient;
@@ -351,7 +352,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
         return new TableCommitImpl(
                 store().newCommit(commitUser),
-                createCommitCallbacks(),
+                createCommitCallbacks(commitUser),
                 snapshotExpire,
                 options.writeOnly() ? null : 
store().newPartitionExpire(commitUser),
                 options.writeOnly() ? null : store().newTagCreationManager(),
@@ -363,17 +364,19 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 coreOptions().forceCreatingSnapshot());
     }
 
-    private List<CommitCallback> createCommitCallbacks() {
+    private List<CommitCallback> createCommitCallbacks(String commitUser) {
         List<CommitCallback> callbacks =
                 new 
ArrayList<>(CallbackUtils.loadCommitCallbacks(coreOptions()));
         CoreOptions options = coreOptions();
         MetastoreClient.Factory metastoreClientFactory =
                 catalogEnvironment.metastoreClientFactory();
+
         if (options.partitionedTableInMetastore()
                 && metastoreClientFactory != null
                 && tableSchema.partitionKeys().size() > 0) {
             callbacks.add(new 
AddPartitionCommitCallback(metastoreClientFactory.create()));
         }
+
         TagPreview tagPreview = TagPreview.create(options);
         if (options.tagToPartitionField() != null
                 && tagPreview != null
@@ -386,6 +389,11 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                             tagPreview);
             callbacks.add(callback);
         }
+
+        if (options.metadataIcebergCompatible()) {
+            callbacks.add(new IcebergCommitCallback(this, commitUser));
+        }
+
         return callbacks;
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
new file mode 100644
index 000000000..25700705d
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.CloseableIterable;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for Iceberg compatibility. */
+public class IcebergCompatibilityTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testUnpartitionedPrimaryKeyTable() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT(), DataTypes.STRING(), 
DataTypes.INT(), DataTypes.BIGINT()
+                        },
+                        new String[] {"k1", "k2", "v1", "v2"});
+
+        int numRecords = 1000;
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        List<TestRecord> testRecords = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            int k1 = random.nextInt(0, 100);
+            String k2 = String.valueOf(random.nextInt(1000, 1010));
+            int v1 = random.nextInt();
+            long v2 = random.nextLong();
+            testRecords.add(
+                    new TestRecord(
+                            BinaryRow.EMPTY_ROW,
+                            String.format("%d|%s", k1, k2),
+                            String.format("%d|%d", v1, v2),
+                            GenericRow.of(k1, BinaryString.fromString(k2), v1, 
v2)));
+        }
+
+        runCompatibilityTest(
+                rowType,
+                Collections.emptyList(),
+                Arrays.asList("k1", "k2"),
+                testRecords,
+                r -> String.format("%d|%s", r.get(0, Integer.class), r.get(1, 
String.class)),
+                r -> String.format("%d|%d", r.get(2, Integer.class), r.get(3, 
Long.class)));
+    }
+
+    @Test
+    public void testPartitionedPrimaryKeyTable() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT()
+                        },
+                        new String[] {"pt1", "pt2", "k", "v1", "v2"});
+
+        BiFunction<Integer, String, BinaryRow> binaryRow =
+                (pt1, pt2) -> {
+                    BinaryRow b = new BinaryRow(2);
+                    BinaryRowWriter writer = new BinaryRowWriter(b);
+                    writer.writeInt(0, pt1);
+                    writer.writeString(1, BinaryString.fromString(pt2));
+                    writer.complete();
+                    return b;
+                };
+
+        int numRecords = 1000;
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        List<TestRecord> testRecords = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            int pt1 = random.nextInt(0, 2);
+            String pt2 = String.valueOf(random.nextInt(10, 12));
+            String k = String.valueOf(random.nextInt(0, 100));
+            int v1 = random.nextInt();
+            long v2 = random.nextLong();
+            testRecords.add(
+                    new TestRecord(
+                            binaryRow.apply(pt1, pt2),
+                            String.format("%d|%s|%s", pt1, pt2, k),
+                            String.format("%d|%d", v1, v2),
+                            GenericRow.of(
+                                    pt1,
+                                    BinaryString.fromString(pt2),
+                                    BinaryString.fromString(k),
+                                    v1,
+                                    v2)));
+        }
+
+        runCompatibilityTest(
+                rowType,
+                Arrays.asList("pt1", "pt2"),
+                Arrays.asList("pt1", "pt2", "k"),
+                testRecords,
+                r ->
+                        String.format(
+                                "%d|%s|%s",
+                                r.get(0, Integer.class),
+                                r.get(1, String.class),
+                                r.get(2, String.class)),
+                r -> String.format("%d|%d", r.get(3, Integer.class), r.get(4, 
Long.class)));
+    }
+
+    @Test
+    public void testAppendOnlyTableWithAllTypes() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT(),
+                            DataTypes.BOOLEAN(),
+                            DataTypes.BIGINT(),
+                            DataTypes.FLOAT(),
+                            DataTypes.DOUBLE(),
+                            DataTypes.DECIMAL(8, 3),
+                            DataTypes.CHAR(20),
+                            DataTypes.STRING(),
+                            DataTypes.BINARY(20),
+                            DataTypes.VARBINARY(20),
+                            DataTypes.DATE()
+                        },
+                        new String[] {
+                            "pt",
+                            "v_boolean",
+                            "v_bigint",
+                            "v_float",
+                            "v_double",
+                            "v_decimal",
+                            "v_char",
+                            "v_varchar",
+                            "v_binary",
+                            "v_varbinary",
+                            "v_date"
+                        });
+
+        Function<Integer, BinaryRow> binaryRow =
+                (pt) -> {
+                    BinaryRow b = new BinaryRow(1);
+                    BinaryRowWriter writer = new BinaryRowWriter(b);
+                    writer.writeInt(0, pt);
+                    writer.complete();
+                    return b;
+                };
+
+        int numRecords = 1000;
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        List<TestRecord> testRecords = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            int pt = random.nextInt(0, 2);
+            boolean vBoolean = random.nextBoolean();
+            long vBigInt = random.nextLong();
+            float vFloat = random.nextFloat();
+            double vDouble = random.nextDouble();
+            Decimal vDecimal = Decimal.fromUnscaledLong(random.nextLong(0, 
100000000), 8, 3);
+            String vChar = String.valueOf(random.nextInt());
+            String vVarChar = String.valueOf(random.nextInt());
+            byte[] vBinary = String.valueOf(random.nextInt()).getBytes();
+            byte[] vVarBinary = String.valueOf(random.nextInt()).getBytes();
+            int vDate = random.nextInt(0, 30000);
+
+            String k =
+                    String.format(
+                            "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s",
+                            pt,
+                            vBoolean,
+                            vBigInt,
+                            vFloat,
+                            vDouble,
+                            vDecimal,
+                            vChar,
+                            vVarChar,
+                            new String(vBinary),
+                            new String(vVarBinary),
+                            LocalDate.ofEpochDay(vDate));
+            testRecords.add(
+                    new TestRecord(
+                            binaryRow.apply(pt),
+                            k,
+                            "",
+                            GenericRow.of(
+                                    pt,
+                                    vBoolean,
+                                    vBigInt,
+                                    vFloat,
+                                    vDouble,
+                                    vDecimal,
+                                    BinaryString.fromString(vChar),
+                                    BinaryString.fromString(vVarChar),
+                                    vBinary,
+                                    vVarBinary,
+                                    vDate)));
+        }
+
+        runCompatibilityTest(
+                rowType,
+                Collections.emptyList(),
+                Collections.emptyList(),
+                testRecords,
+                r ->
+                        String.format(
+                                "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s",
+                                r.get(0),
+                                r.get(1),
+                                r.get(2),
+                                r.get(3),
+                                r.get(4),
+                                r.get(5),
+                                r.get(6),
+                                r.get(7),
+                                new String(r.get(8, ByteBuffer.class).array()),
+                                new String(r.get(9, ByteBuffer.class).array()),
+                                r.get(10)),
+                r -> "");
+    }
+
+    private void runCompatibilityTest(
+            RowType rowType,
+            List<String> partitionKeys,
+            List<String> primaryKeys,
+            List<TestRecord> testRecords,
+            Function<Record, String> icebergRecordToKey,
+            Function<Record, String> icebergRecordToValue)
+            throws Exception {
+        LocalFileIO fileIO = LocalFileIO.create();
+        Path path = new Path(tempDir.toString());
+
+        Options options = new Options();
+        if (!primaryKeys.isEmpty()) {
+            options.set(CoreOptions.BUCKET, 2);
+        }
+        options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true);
+        options.set(CoreOptions.FILE_FORMAT, "avro");
+        Schema schema =
+                new Schema(rowType.getFields(), partitionKeys, primaryKeys, 
options.toMap(), "");
+
+        FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path);
+        paimonCatalog.createDatabase("mydb", false);
+        Identifier paimonIdentifier = Identifier.create("mydb", "t");
+        paimonCatalog.createTable(paimonIdentifier, schema, false);
+        FileStoreTable table = (FileStoreTable) 
paimonCatalog.getTable(paimonIdentifier);
+
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+
+        Map<String, String> expected = new HashMap<>();
+        for (TestRecord testRecord : testRecords) {
+            expected.put(testRecord.key, testRecord.value);
+            write.write(testRecord.record);
+        }
+
+        if (!primaryKeys.isEmpty()) {
+            for (BinaryRow partition :
+                    testRecords.stream().map(t -> 
t.partition).collect(Collectors.toSet())) {
+                for (int b = 0; b < 2; b++) {
+                    write.compact(partition, b, true);
+                }
+            }
+        }
+        commit.commit(1, write.prepareCommit(true, 1));
+        write.close();
+        commit.close();
+
+        HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(), 
tempDir.toString());
+        TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t");
+        org.apache.iceberg.Table icebergTable = 
icebergCatalog.loadTable(icebergIdentifier);
+        CloseableIterable<Record> result = 
IcebergGenerics.read(icebergTable).build();
+        Map<String, String> actual = new HashMap<>();
+        for (Record record : result) {
+            actual.put(icebergRecordToKey.apply(record), 
icebergRecordToValue.apply(record));
+        }
+        result.close();
+
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    private static class TestRecord {
+        private final BinaryRow partition;
+        private final String key;
+        private final String value;
+        private final GenericRow record;
+
+        private TestRecord(BinaryRow partition, String key, String value, 
GenericRow record) {
+            this.partition = partition;
+            this.key = key;
+            this.value = value;
+            this.record = record;
+        }
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
index 29e6eadcf..c390b1cbe 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
@@ -41,7 +41,9 @@ import org.apache.avro.file.DataFileWriter;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
@@ -57,6 +59,9 @@ public class AvroFileFormat extends FileFormat {
                     .defaultValue(SNAPPY_CODEC)
                     .withDescription("The compression codec for avro");
 
+    public static final ConfigOption<Map<String, String>> 
AVRO_ROW_NAME_MAPPING =
+            ConfigOptions.key("row-name-mapping").mapType().defaultValue(new 
HashMap<>());
+
     private final FormatContext context;
 
     public AvroFileFormat(FormatContext context) {
@@ -85,7 +90,7 @@ public class AvroFileFormat extends FileFormat {
     public void validateDataFields(RowType rowType) {
         List<DataType> fieldTypes = rowType.getFieldTypes();
         for (DataType dataType : fieldTypes) {
-            AvroSchemaConverter.convertToSchema(dataType);
+            AvroSchemaConverter.convertToSchema(dataType, new HashMap<>());
         }
     }
 
@@ -110,7 +115,10 @@ public class AvroFileFormat extends FileFormat {
             this.factory =
                     new AvroWriterFactory<>(
                             (out, compression) -> {
-                                Schema schema = 
AvroSchemaConverter.convertToSchema(rowType);
+                                Schema schema =
+                                        AvroSchemaConverter.convertToSchema(
+                                                rowType,
+                                                
context.formatOptions().get(AVRO_ROW_NAME_MAPPING));
                                 AvroRowDatumWriter datumWriter = new 
AvroRowDatumWriter(rowType);
                                 DataFileWriter<InternalRow> dataFileWriter =
                                         new DataFileWriter<>(datumWriter);
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
index bfca9006a..5abc98b26 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
@@ -35,6 +35,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 
 import java.util.List;
+import java.util.Map;
 
 /** Converts an Avro schema into Paimon's type information. */
 public class AvroSchemaConverter {
@@ -52,8 +53,8 @@ public class AvroSchemaConverter {
      *     nested type
      * @return Avro's {@link Schema} matching this logical type.
      */
-    public static Schema convertToSchema(DataType schema) {
-        return convertToSchema(schema, 
"org.apache.paimon.avro.generated.record");
+    public static Schema convertToSchema(DataType schema, Map<String, String> 
rowNameMapping) {
+        return convertToSchema(schema, 
"org.apache.paimon.avro.generated.record", rowNameMapping);
     }
 
     /**
@@ -66,7 +67,12 @@ public class AvroSchemaConverter {
      * @param rowName the record name
      * @return Avro's {@link Schema} matching this logical type.
      */
-    public static Schema convertToSchema(DataType dataType, String rowName) {
+    public static Schema convertToSchema(
+            DataType dataType, String rowName, Map<String, String> 
rowNameMapping) {
+        if (rowNameMapping.containsKey(rowName)) {
+            rowName = rowNameMapping.get(rowName);
+        }
+
         int precision;
         boolean nullable = dataType.isNullable();
         switch (dataType.getTypeRoot()) {
@@ -166,7 +172,11 @@ public class AvroSchemaConverter {
                     DataType fieldType = rowType.getTypeAt(i);
                     SchemaBuilder.GenericDefault<Schema> fieldBuilder =
                             builder.name(fieldName)
-                                    .type(convertToSchema(fieldType, rowName + 
"_" + fieldName));
+                                    .type(
+                                            convertToSchema(
+                                                    fieldType,
+                                                    rowName + "_" + fieldName,
+                                                    rowNameMapping));
 
                     if (fieldType.isNullable()) {
                         builder = fieldBuilder.withDefault(null);
@@ -183,14 +193,20 @@ public class AvroSchemaConverter {
                                 .map()
                                 .values(
                                         convertToSchema(
-                                                
extractValueTypeToAvroMap(dataType), rowName));
+                                                
extractValueTypeToAvroMap(dataType),
+                                                rowName,
+                                                rowNameMapping));
                 return nullable ? nullableSchema(map) : map;
             case ARRAY:
                 ArrayType arrayType = (ArrayType) dataType;
                 Schema array =
                         SchemaBuilder.builder()
                                 .array()
-                                
.items(convertToSchema(arrayType.getElementType(), rowName));
+                                .items(
+                                        convertToSchema(
+                                                arrayType.getElementType(),
+                                                rowName,
+                                                rowNameMapping));
                 return nullable ? nullableSchema(array) : array;
             default:
                 throw new UnsupportedOperationException(

Reply via email to