This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f83ecf4dd [core] Introduce IcebergCommitCallback to create Iceberg
metadata after commit (#3731)
f83ecf4dd is described below
commit f83ecf4dd7c98cec37fe5e30bad69b77a5b38d7c
Author: tsreaper <[email protected]>
AuthorDate: Fri Jul 12 10:35:13 2024 +0800
[core] Introduce IcebergCommitCallback to create Iceberg metadata after
commit (#3731)
---
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 12 +
paimon-core/pom.xml | 14 +
.../paimon/iceberg/IcebergCommitCallback.java | 248 +++++++++++++++
.../apache/paimon/iceberg/IcebergPathFactory.java | 93 ++++++
.../iceberg/manifest/IcebergConversions.java | 81 +++++
.../iceberg/manifest/IcebergDataFileMeta.java | 144 +++++++++
.../manifest/IcebergDataFileMetaSerializer.java | 61 ++++
.../iceberg/manifest/IcebergManifestEntry.java | 136 ++++++++
.../manifest/IcebergManifestEntrySerializer.java | 57 ++++
.../iceberg/manifest/IcebergManifestFile.java | 189 +++++++++++
.../iceberg/manifest/IcebergManifestFileMeta.java | 228 ++++++++++++++
.../IcebergManifestFileMetaSerializer.java | 94 ++++++
.../iceberg/manifest/IcebergManifestList.java | 48 +++
.../iceberg/manifest/IcebergPartitionSummary.java | 97 ++++++
.../IcebergPartitionSummarySerializer.java | 46 +++
.../paimon/iceberg/metadata/IcebergDataField.java | 159 ++++++++++
.../paimon/iceberg/metadata/IcebergMetadata.java | 317 +++++++++++++++++++
.../iceberg/metadata/IcebergPartitionField.java | 119 +++++++
.../iceberg/metadata/IcebergPartitionSpec.java | 88 ++++++
.../paimon/iceberg/metadata/IcebergSchema.java | 109 +++++++
.../paimon/iceberg/metadata/IcebergSnapshot.java | 130 ++++++++
.../iceberg/metadata/IcebergSnapshotSummary.java | 71 +++++
.../paimon/iceberg/metadata/IcebergSortOrder.java | 90 ++++++
.../paimon/table/AbstractFileStoreTable.java | 12 +-
.../paimon/iceberg/IcebergCompatibilityTest.java | 348 +++++++++++++++++++++
.../apache/paimon/format/avro/AvroFileFormat.java | 12 +-
.../paimon/format/avro/AvroSchemaConverter.java | 28 +-
28 files changed, 3027 insertions(+), 10 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index ed9fdb53c..81f80c0fd 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -417,6 +417,12 @@ Mainly to resolve data skew on primary keys. We recommend
starting with 64 mb wh
<td><p>Enum</p></td>
<td>Specify the merge engine for table with primary key.<br /><br
/>Possible values:<ul><li>"deduplicate": De-duplicate and keep the last
row.</li><li>"partial-update": Partial update non-null
fields.</li><li>"aggregation": Aggregate fields with same primary
key.</li><li>"first-row": De-duplicate and keep the first row.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>metadata.iceberg-compatible</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>When set to true, produce Iceberg metadata after a snapshot is
committed, so that Iceberg readers can read Paimon's raw files.</td>
+ </tr>
<tr>
<td><h5>metadata.stats-mode</h5></td>
<td style="word-wrap: break-word;">"truncate(16)"</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 3c286ecd2..69c236b9d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1252,6 +1252,14 @@ public class CoreOptions implements Serializable {
+ ChangelogProducer.LOOKUP.name()
+ ", commit will wait for changelog
generation by lookup.");
+ public static final ConfigOption<Boolean> METADATA_ICEBERG_COMPATIBLE =
+ key("metadata.iceberg-compatible")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "When set to true, produce Iceberg metadata after
a snapshot is committed, "
+ + "so that Iceberg readers can read
Paimon's raw files.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -1977,6 +1985,10 @@ public class CoreOptions implements Serializable {
&& options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
}
+ public boolean metadataIcebergCompatible() {
+ return options.get(METADATA_ICEBERG_COMPATIBLE);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 533a1cd3d..a2591bc6b 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -190,6 +190,20 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-core</artifactId>
+ <version>1.5.2</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-data</artifactId>
+ <version>1.5.2</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
new file mode 100644
index 000000000..95dae5569
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.manifest.IcebergDataFileMeta;
+import org.apache.paimon.iceberg.manifest.IcebergManifestEntry;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFile;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta;
+import org.apache.paimon.iceberg.manifest.IcebergManifestList;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionField;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
+import org.apache.paimon.iceberg.metadata.IcebergSchema;
+import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
+import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitCallback;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.RawFile;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * A {@link CommitCallback} to create Iceberg compatible metadata, so Iceberg
readers can read
+ * Paimon's {@link RawFile}.
+ */
+public class IcebergCommitCallback implements CommitCallback {
+
+ // see org.apache.iceberg.hadoop.Util
+ private static final String VERSION_HINT_FILENAME = "version-hint.text";
+
+ private final FileStoreTable table;
+ private final String commitUser;
+ private final IcebergPathFactory pathFactory;
+
+ private final IcebergManifestFile manifestFile;
+ private final IcebergManifestList manifestList;
+
+ public IcebergCommitCallback(FileStoreTable table, String commitUser) {
+ this.table = table;
+ this.commitUser = commitUser;
+ this.pathFactory = new IcebergPathFactory(table.location());
+
+ RowType partitionType = table.schema().logicalPartitionType();
+ RowType entryType = IcebergManifestEntry.schema(partitionType);
+ Options manifestFileAvroOptions = Options.fromMap(table.options());
+ //
https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestReader.java
+ manifestFileAvroOptions.set(
+ "avro.row-name-mapping",
+ "org.apache.paimon.avro.generated.record:manifest_entry,"
+ + "manifest_entry_data_file:r2,"
+ + "r2_partition:r102");
+ FileFormat manifestFileAvro =
FileFormat.getFileFormat(manifestFileAvroOptions, "avro");
+ this.manifestFile =
+ new IcebergManifestFile(
+ table.fileIO(),
+ partitionType,
+ manifestFileAvro.createReaderFactory(entryType),
+ manifestFileAvro.createWriterFactory(entryType),
+ table.coreOptions().manifestCompression(),
+ pathFactory.manifestFileFactory(),
+ table.coreOptions().manifestTargetSize());
+
+ Options manifestListAvroOptions = Options.fromMap(table.options());
+ //
https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestLists.java
+ manifestListAvroOptions.set(
+ "avro.row-name-mapping",
+ "org.apache.paimon.avro.generated.record:manifest_file,"
+ + "manifest_file_partitions:r508");
+ FileFormat manifestListAvro =
FileFormat.getFileFormat(manifestListAvroOptions, "avro");
+ this.manifestList =
+ new IcebergManifestList(
+ table.fileIO(),
+
manifestListAvro.createReaderFactory(IcebergManifestFileMeta.schema()),
+
manifestListAvro.createWriterFactory(IcebergManifestFileMeta.schema()),
+ table.coreOptions().manifestCompression(),
+ pathFactory.manifestListFactory());
+ }
+
+ @Override
+ public void call(List<ManifestCommittable> committables) {
+ for (ManifestCommittable committable : committables) {
+ try {
+ commitMetadata(committable);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+
+ private void commitMetadata(ManifestCommittable committable) throws
IOException {
+ Pair<Long, Long> pair =
getCurrentAndBaseSnapshotIds(committable.identifier());
+ long currentSnapshot = pair.getLeft();
+ Long baseSnapshot = pair.getRight();
+
+ createMetadataWithoutBase(currentSnapshot);
+ }
+
+ private Pair<Long, Long> getCurrentAndBaseSnapshotIds(long
commitIdentifier) {
+ SnapshotManager snapshotManager = table.snapshotManager();
+ List<Snapshot> currentSnapshots =
+ snapshotManager.findSnapshotsForIdentifiers(
+ commitUser,
Collections.singletonList(commitIdentifier));
+ Preconditions.checkArgument(
+ currentSnapshots.size() == 1,
+ "Cannot find snapshot with user {} and identifier {}",
+ commitUser,
+ commitIdentifier);
+ long currentSnapshotId = currentSnapshots.get(0).id();
+
+ long earliest =
+ Preconditions.checkNotNull(
+ snapshotManager.earliestSnapshotId(),
+ "Cannot determine earliest snapshot ID. This is
unexpected.");
+ Long baseSnapshotId = null;
+ for (long id = currentSnapshotId - 1; id >= earliest; id--) {
+ try {
+ Snapshot snapshot = snapshotManager.snapshot(id);
+ if (!snapshot.commitUser().equals(commitUser)
+ || snapshot.commitIdentifier() < commitIdentifier) {
+ baseSnapshotId = id;
+ break;
+ }
+ } catch (Exception ignore) {
+ break;
+ }
+ }
+
+ return Pair.of(currentSnapshotId, baseSnapshotId);
+ }
+
+ private void createMetadataWithoutBase(long snapshotId) throws IOException
{
+ SnapshotReader snapshotReader =
table.newSnapshotReader().withSnapshot(snapshotId);
+ Iterator<IcebergManifestEntry> entryIterator =
+ snapshotReader.read().dataSplits().stream()
+ .filter(DataSplit::rawConvertible)
+ .flatMap(s -> dataSplitToDataFileMeta(s).stream())
+ .map(
+ m ->
+ new IcebergManifestEntry(
+
IcebergManifestEntry.Status.ADDED,
+ snapshotId,
+ snapshotId,
+ snapshotId,
+ m))
+ .iterator();
+ List<IcebergManifestFileMeta> manifestFileMetas =
+ manifestFile.rollingWrite(entryIterator, snapshotId);
+ String manifestListFileName =
manifestList.writeWithoutRolling(manifestFileMetas);
+
+ List<IcebergPartitionField> partitionFields =
+ getPartitionFields(table.schema().logicalPartitionType());
+ int schemaId = (int) table.schema().id();
+ IcebergSnapshot snapshot =
+ new IcebergSnapshot(
+ snapshotId,
+ snapshotId,
+ System.currentTimeMillis(),
+ new
IcebergSnapshotSummary(IcebergSnapshotSummary.OPERATION_APPEND),
+
pathFactory.toManifestListPath(manifestListFileName).toString(),
+ schemaId);
+
+ String tableUuid = UUID.randomUUID().toString();
+ IcebergMetadata metadata =
+ new IcebergMetadata(
+ tableUuid,
+ table.location().toString(),
+ snapshotId,
+ table.schema().highestFieldId(),
+ Collections.singletonList(new
IcebergSchema(table.schema())),
+ schemaId,
+ Collections.singletonList(new
IcebergPartitionSpec(partitionFields)),
+ partitionFields.stream()
+ .mapToInt(IcebergPartitionField::fieldId)
+ .max()
+ .orElse(
+ // not sure why, this is a result
tested by hand
+ IcebergPartitionField.FIRST_FIELD_ID -
1),
+ Collections.singletonList(snapshot),
+ (int) snapshotId);
+
table.fileIO().tryToWriteAtomic(pathFactory.toMetadataPath(snapshotId),
metadata.toJson());
+ table.fileIO()
+ .overwriteFileUtf8(
+ new Path(pathFactory.metadataDirectory(),
VERSION_HINT_FILENAME),
+ String.valueOf(snapshotId));
+ }
+
+ private List<IcebergDataFileMeta> dataSplitToDataFileMeta(DataSplit
dataSplit) {
+ List<IcebergDataFileMeta> result = new ArrayList<>();
+ for (RawFile rawFile : dataSplit.convertToRawFiles().get()) {
+ result.add(
+ new IcebergDataFileMeta(
+ IcebergDataFileMeta.Content.DATA,
+ rawFile.path(),
+ rawFile.format(),
+ dataSplit.partition(),
+ rawFile.rowCount(),
+ rawFile.fileSize()));
+ }
+ return result;
+ }
+
+ private List<IcebergPartitionField> getPartitionFields(RowType
partitionType) {
+ List<IcebergPartitionField> result = new ArrayList<>();
+ int fieldId = IcebergPartitionField.FIRST_FIELD_ID;
+ for (DataField field : partitionType.getFields()) {
+ result.add(new IcebergPartitionField(field, fieldId));
+ fieldId++;
+ }
+ return result;
+ }
+
+ @Override
+ public void close() throws Exception {}
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java
new file mode 100644
index 000000000..8b5680952
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.PathFactory;
+
+import java.util.UUID;
+
+/** Path factory for Iceberg metadata files. */
+public class IcebergPathFactory {
+
+ private final Path root;
+ private final String uuid;
+
+ private int manifestFileCount;
+ private int manifestListCount;
+
+ public IcebergPathFactory(Path root) {
+ this.root = root;
+ this.uuid = UUID.randomUUID().toString();
+ }
+
+ public Path metadataDirectory() {
+ return new Path(root, "metadata");
+ }
+
+ public Path newManifestFile() {
+ manifestFileCount++;
+ return toManifestFilePath(uuid + "-m" + manifestFileCount + ".avro");
+ }
+
+ public Path toManifestFilePath(String manifestFileName) {
+ return new Path(metadataDirectory(), manifestFileName);
+ }
+
+ public Path newManifestListFile() {
+ manifestListCount++;
+ return toManifestListPath("snap-" + manifestListCount + "-" + uuid +
".avro");
+ }
+
+ public Path toManifestListPath(String manifestListName) {
+ return new Path(metadataDirectory(), manifestListName);
+ }
+
+ public Path toMetadataPath(long snapshotId) {
+ return new Path(metadataDirectory(),
String.format("v%d.metadata.json", snapshotId));
+ }
+
+ public PathFactory manifestFileFactory() {
+ return new PathFactory() {
+ @Override
+ public Path newPath() {
+ return newManifestFile();
+ }
+
+ @Override
+ public Path toPath(String fileName) {
+ return toManifestFilePath(fileName);
+ }
+ };
+ }
+
+ public PathFactory manifestListFactory() {
+ return new PathFactory() {
+ @Override
+ public Path newPath() {
+ return newManifestListFile();
+ }
+
+ @Override
+ public Path toPath(String fileName) {
+ return toManifestListPath(fileName);
+ }
+ };
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java
new file mode 100644
index 000000000..fb83dd52c
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.types.DataType;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Conversions between Java object and bytes.
+ *
+ * <p>See <a
href="https://iceberg.apache.org/spec/#binary-single-value-serialization">Iceberg
+ * spec</a>.
+ */
+public class IcebergConversions {
+
+ private IcebergConversions() {}
+
+ private static final ThreadLocal<CharsetEncoder> ENCODER =
+ ThreadLocal.withInitial(StandardCharsets.UTF_8::newEncoder);
+
+ public static ByteBuffer toByteBuffer(DataType type, Object value) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ return ByteBuffer.allocate(1).put(0, (Boolean) value ? (byte)
0x01 : (byte) 0x00);
+ case INTEGER:
+ case DATE:
+ return
ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(0, (int) value);
+ case BIGINT:
+ return ByteBuffer.allocate(8)
+ .order(ByteOrder.LITTLE_ENDIAN)
+ .putLong(0, (long) value);
+ case FLOAT:
+ return ByteBuffer.allocate(4)
+ .order(ByteOrder.LITTLE_ENDIAN)
+ .putFloat(0, (float) value);
+ case DOUBLE:
+ return ByteBuffer.allocate(8)
+ .order(ByteOrder.LITTLE_ENDIAN)
+ .putDouble(0, (double) value);
+ case CHAR:
+ case VARCHAR:
+ CharBuffer buffer = CharBuffer.wrap(value.toString());
+ try {
+ return ENCODER.get().encode(buffer);
+ } catch (CharacterCodingException e) {
+ throw new RuntimeException("Failed to encode value as
UTF-8: " + value, e);
+ }
+ case BINARY:
+ case VARBINARY:
+ return ByteBuffer.wrap((byte[]) value);
+ case DECIMAL:
+ Decimal decimal = (Decimal) value;
+ return ByteBuffer.wrap((decimal.toUnscaledBytes()));
+ default:
+ throw new UnsupportedOperationException("Cannot serialize
type: " + type);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
new file mode 100644
index 000000000..292d8488d
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Iceberg data file meta.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#manifests">Iceberg
spec</a>.
+ */
+public class IcebergDataFileMeta {
+
+ /** See Iceberg <code>data_file</code> struct <code>content</code> field.
*/
+ public enum Content {
+ DATA(0),
+ POSITION_DELETES(1),
+ EQUALITY_DELETES(2);
+
+ private final int id;
+
+ Content(int id) {
+ this.id = id;
+ }
+
+ public int id() {
+ return id;
+ }
+
+ public static Content fromId(int id) {
+ switch (id) {
+ case 0:
+ return DATA;
+ case 1:
+ return POSITION_DELETES;
+ case 2:
+ return EQUALITY_DELETES;
+ }
+ throw new IllegalArgumentException("Unknown manifest content: " +
id);
+ }
+ }
+
+ private final Content content;
+ private final String filePath;
+ private final String fileFormat;
+ private final BinaryRow partition;
+ private final long recordCount;
+ private final long fileSizeInBytes;
+
+ public IcebergDataFileMeta(
+ Content content,
+ String filePath,
+ String fileFormat,
+ BinaryRow partition,
+ long recordCount,
+ long fileSizeInBytes) {
+ this.content = content;
+ this.filePath = filePath;
+ this.fileFormat = fileFormat;
+ this.partition = partition;
+ this.recordCount = recordCount;
+ this.fileSizeInBytes = fileSizeInBytes;
+ }
+
+ public Content content() {
+ return content;
+ }
+
+ public String filePath() {
+ return filePath;
+ }
+
+ public String fileFormat() {
+ return fileFormat;
+ }
+
+ public BinaryRow partition() {
+ return partition;
+ }
+
+ public long recordCount() {
+ return recordCount;
+ }
+
+ public long fileSizeInBytes() {
+ return fileSizeInBytes;
+ }
+
+ public static RowType schema(RowType partitionType) {
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(134, "content", DataTypes.INT().notNull()));
+ fields.add(new DataField(100, "file_path",
DataTypes.STRING().notNull()));
+ fields.add(new DataField(101, "file_format",
DataTypes.STRING().notNull()));
+ fields.add(new DataField(102, "partition", partitionType));
+ fields.add(new DataField(103, "record_count",
DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(104, "file_size_in_bytes",
DataTypes.BIGINT().notNull()));
+ return new RowType(fields);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IcebergDataFileMeta that = (IcebergDataFileMeta) o;
+ return content == that.content
+ && recordCount == that.recordCount
+ && fileSizeInBytes == that.fileSizeInBytes
+ && Objects.equals(filePath, that.filePath)
+ && Objects.equals(fileFormat, that.fileFormat)
+ && Objects.equals(partition, that.partition);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(content, filePath, fileFormat, partition,
recordCount, fileSizeInBytes);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java
new file mode 100644
index 000000000..b4aa281e6
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ObjectSerializer;
+
+/** Serializer for {@link IcebergDataFileMeta}. */
+public class IcebergDataFileMetaSerializer extends
ObjectSerializer<IcebergDataFileMeta> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final InternalRowSerializer partSerializer;
+
+ public IcebergDataFileMetaSerializer(RowType partitionType) {
+ super(IcebergDataFileMeta.schema(partitionType));
+ this.partSerializer = new InternalRowSerializer(partitionType);
+ }
+
+ @Override
+ public InternalRow toRow(IcebergDataFileMeta file) {
+ return GenericRow.of(
+ file.content().id(),
+ BinaryString.fromString(file.filePath()),
+ BinaryString.fromString(file.fileFormat()),
+ file.partition(),
+ file.recordCount(),
+ file.fileSizeInBytes());
+ }
+
+ @Override
+ public IcebergDataFileMeta fromRow(InternalRow row) {
+ return new IcebergDataFileMeta(
+ IcebergDataFileMeta.Content.fromId(row.getInt(0)),
+ row.getString(1).toString(),
+ row.getString(2).toString(),
+ partSerializer.toBinaryRow(row.getRow(3,
partSerializer.getArity())).copy(),
+ row.getLong(4),
+ row.getLong(5));
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java
new file mode 100644
index 000000000..6b76fb757
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Entry of an Iceberg manifest file.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#manifests">Iceberg
spec</a>.
+ */
+public class IcebergManifestEntry {
+
+ /** See Iceberg <code>manifest_entry</code> struct <code>status</code>
field. */
+ public enum Status {
+ EXISTING(0),
+ ADDED(1),
+ DELETED(2);
+
+ private final int id;
+
+ Status(int id) {
+ this.id = id;
+ }
+
+ public int id() {
+ return id;
+ }
+
+ public static Status fromId(int id) {
+ switch (id) {
+ case 0:
+ return EXISTING;
+ case 1:
+ return ADDED;
+ case 2:
+ return DELETED;
+ }
+ throw new IllegalArgumentException("Unknown manifest content: " +
id);
+ }
+ }
+
+ private final Status status;
+ private final long snapshotId;
+ // sequenceNumber indicates when the records in the data files are
written. It might be smaller
+ // than fileSequenceNumber.
+ // For example, when a file with sequenceNumber 3 and another file with
sequenceNumber 5 are
+ // compacted into one file during snapshot 6, the compacted file will have
sequenceNumber =
+ // max(3, 5) = 5, and fileSequenceNumber = 6.
+ private final long sequenceNumber;
+ private final long fileSequenceNumber;
+ private final IcebergDataFileMeta dataFile;
+
+ public IcebergManifestEntry(
+ Status status,
+ long snapshotId,
+ long sequenceNumber,
+ long fileSequenceNumber,
+ IcebergDataFileMeta dataFile) {
+ this.status = status;
+ this.snapshotId = snapshotId;
+ this.sequenceNumber = sequenceNumber;
+ this.fileSequenceNumber = fileSequenceNumber;
+ this.dataFile = dataFile;
+ }
+
+ public Status status() {
+ return status;
+ }
+
+ public long snapshotId() {
+ return snapshotId;
+ }
+
+ public long sequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public long fileSequenceNumber() {
+ return fileSequenceNumber;
+ }
+
+ public IcebergDataFileMeta file() {
+ return dataFile;
+ }
+
+ public static RowType schema(RowType partitionType) {
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(0, "status", DataTypes.INT().notNull()));
+ fields.add(new DataField(1, "snapshot_id", DataTypes.BIGINT()));
+ fields.add(new DataField(3, "sequence_number", DataTypes.BIGINT()));
+ fields.add(new DataField(4, "file_sequence_number",
DataTypes.BIGINT()));
+ fields.add(
+ new DataField(2, "data_file",
IcebergDataFileMeta.schema(partitionType).notNull()));
+ return new RowType(fields);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IcebergManifestEntry that = (IcebergManifestEntry) o;
+ return status == that.status && Objects.equals(dataFile,
that.dataFile);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(status, dataFile);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
new file mode 100644
index 000000000..d93456c3f
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ObjectSerializer;
+
+/** Serializer for {@link IcebergManifestEntry}. */
+public class IcebergManifestEntrySerializer extends
ObjectSerializer<IcebergManifestEntry> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final IcebergDataFileMetaSerializer fileSerializer;
+
+ public IcebergManifestEntrySerializer(RowType partitionType) {
+ super(IcebergManifestEntry.schema(partitionType));
+ this.fileSerializer = new IcebergDataFileMetaSerializer(partitionType);
+ }
+
+ @Override
+ public InternalRow toRow(IcebergManifestEntry entry) {
+ return GenericRow.of(
+ entry.status().id(),
+ entry.snapshotId(),
+ entry.sequenceNumber(),
+ entry.fileSequenceNumber(),
+ fileSerializer.toRow(entry.file()));
+ }
+
+ @Override
+ public IcebergManifestEntry fromRow(InternalRow row) {
+ return new IcebergManifestEntry(
+ IcebergManifestEntry.Status.fromId(row.getInt(0)),
+ row.getLong(1),
+ row.getLong(2),
+ row.getLong(3),
+ fileSerializer.fromRow(row.getRow(4,
fileSerializer.numFields())));
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
new file mode 100644
index 000000000..d4c363b4c
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SimpleColStats;
+import org.apache.paimon.format.SimpleStatsCollector;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta.Content;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.io.SingleFileWriter;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ObjectsFile;
+import org.apache.paimon.utils.PathFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static
org.apache.paimon.iceberg.manifest.IcebergConversions.toByteBuffer;
+
+/**
+ * This file includes several Iceberg {@link ManifestEntry}s, representing the
additional changes
+ * since last snapshot.
+ */
+public class IcebergManifestFile extends ObjectsFile<IcebergManifestEntry> {
+
+ private static final long UNASSIGNED_SEQ = -1L;
+
+ private final RowType partitionType;
+ private final FormatWriterFactory writerFactory;
+ private final MemorySize targetFileSize;
+
+ public IcebergManifestFile(
+ FileIO fileIO,
+ RowType partitionType,
+ FormatReaderFactory readerFactory,
+ FormatWriterFactory writerFactory,
+ String compression,
+ PathFactory pathFactory,
+ MemorySize targetFileSize) {
+ super(
+ fileIO,
+ new IcebergManifestEntrySerializer(partitionType),
+ readerFactory,
+ writerFactory,
+ compression,
+ pathFactory,
+ null);
+ this.partitionType = partitionType;
+ this.writerFactory = writerFactory;
+ this.targetFileSize = targetFileSize;
+ }
+
+ public List<IcebergManifestFileMeta> rollingWrite(
+ Iterator<IcebergManifestEntry> entries, long sequenceNumber)
throws IOException {
+ RollingFileWriter<IcebergManifestEntry, IcebergManifestFileMeta>
writer =
+ new RollingFileWriter<>(
+ () -> createWriter(sequenceNumber),
targetFileSize.getBytes());
+ try {
+ writer.write(entries);
+ writer.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return writer.result();
+ }
+
+ public SingleFileWriter<IcebergManifestEntry, IcebergManifestFileMeta>
createWriter(
+ long sequenceNumber) {
+ return new IcebergManifestEntryWriter(
+ writerFactory,
+ pathFactory.newPath(),
+ CoreOptions.FILE_COMPRESSION.defaultValue(),
+ sequenceNumber);
+ }
+
+ private class IcebergManifestEntryWriter
+ extends SingleFileWriter<IcebergManifestEntry,
IcebergManifestFileMeta> {
+
+ private final SimpleStatsCollector partitionStatsCollector;
+ private final long sequenceNumber;
+
+ private int addedFilesCount = 0;
+ private int existingFilesCount = 0;
+ private int deletedFilesCount = 0;
+ private long addedRowsCount = 0;
+ private long existingRowsCount = 0;
+ private long deletedRowsCount = 0;
+ private Long minSequenceNumber = null;
+
+ IcebergManifestEntryWriter(
+ FormatWriterFactory factory,
+ Path path,
+ String fileCompression,
+ long sequenceNumber) {
+ super(
+ IcebergManifestFile.this.fileIO,
+ factory,
+ path,
+ serializer::toRow,
+ fileCompression);
+ this.partitionStatsCollector = new
SimpleStatsCollector(partitionType);
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ @Override
+ public void write(IcebergManifestEntry entry) throws IOException {
+ super.write(entry);
+
+ switch (entry.status()) {
+ case ADDED:
+ addedFilesCount += 1;
+ addedRowsCount += entry.file().recordCount();
+ break;
+ case EXISTING:
+ existingFilesCount += 1;
+ existingRowsCount += entry.file().recordCount();
+ break;
+ case DELETED:
+ deletedFilesCount += 1;
+ deletedRowsCount += entry.file().recordCount();
+ break;
+ }
+
+ if (minSequenceNumber == null || minSequenceNumber >
entry.sequenceNumber()) {
+ minSequenceNumber = entry.sequenceNumber();
+ }
+
+ partitionStatsCollector.collect(entry.file().partition());
+ }
+
+ @Override
+ public IcebergManifestFileMeta result() throws IOException {
+ SimpleColStats[] stats = partitionStatsCollector.extract();
+ List<IcebergPartitionSummary> partitionSummaries = new
ArrayList<>();
+ for (int i = 0; i < stats.length; i++) {
+ SimpleColStats fieldStats = stats[i];
+ DataType type = partitionType.getTypeAt(i);
+ partitionSummaries.add(
+ new IcebergPartitionSummary(
+ Objects.requireNonNull(fieldStats.nullCount())
> 0,
+ false, // TODO correct it?
+ toByteBuffer(type, fieldStats.min()).array(),
+ toByteBuffer(type, fieldStats.max()).array()));
+ }
+ return new IcebergManifestFileMeta(
+ path.toString(),
+ fileIO.getFileSize(path),
+ IcebergPartitionSpec.SPEC_ID,
+ Content.DATA,
+ sequenceNumber,
+ minSequenceNumber != null ? minSequenceNumber :
UNASSIGNED_SEQ,
+ sequenceNumber,
+ addedFilesCount,
+ existingFilesCount,
+ deletedFilesCount,
+ addedRowsCount,
+ existingRowsCount,
+ deletedRowsCount,
+ partitionSummaries);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java
new file mode 100644
index 000000000..571b24960
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Metadata of an Iceberg manifest file.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#manifest-lists">Iceberg
spec</a>.
+ */
+public class IcebergManifestFileMeta {
+
+ /** Content type stored in a manifest file. */
+ public enum Content {
+ DATA(0),
+ DELETES(1);
+
+ private final int id;
+
+ Content(int id) {
+ this.id = id;
+ }
+
+ public int id() {
+ return id;
+ }
+
+ public static Content fromId(int id) {
+ switch (id) {
+ case 0:
+ return DATA;
+ case 1:
+ return DELETES;
+ }
+ throw new IllegalArgumentException("Unknown manifest content: " +
id);
+ }
+ }
+
+ private final String manifestPath;
+ private final long manifestLength;
+ private final int partitionSpecId;
+ private final Content content;
+ private final long sequenceNumber;
+ private final long minSequenceNumber;
+ private final long addedSnapshotId;
+ private final int addedFilesCount;
+ private final int existingFilesCount;
+ private final int deletedFilesCount;
+ private final long addedRowsCount;
+ private final long existingRowsCount;
+ private final long deletedRowsCount;
+ private final List<IcebergPartitionSummary> partitions;
+
+ public IcebergManifestFileMeta(
+ String manifestPath,
+ long manifestLength,
+ int partitionSpecId,
+ Content content,
+ long sequenceNumber,
+ long minSequenceNumber,
+ long addedSnapshotId,
+ int addedFilesCount,
+ int existingFilesCount,
+ int deletedFilesCount,
+ long addedRowsCount,
+ long existingRowsCount,
+ long deletedRowsCount,
+ List<IcebergPartitionSummary> partitions) {
+ this.manifestPath = manifestPath;
+ this.manifestLength = manifestLength;
+ this.partitionSpecId = partitionSpecId;
+ this.content = content;
+ this.sequenceNumber = sequenceNumber;
+ this.minSequenceNumber = minSequenceNumber;
+ this.addedSnapshotId = addedSnapshotId;
+ this.addedFilesCount = addedFilesCount;
+ this.existingFilesCount = existingFilesCount;
+ this.deletedFilesCount = deletedFilesCount;
+ this.addedRowsCount = addedRowsCount;
+ this.existingRowsCount = existingRowsCount;
+ this.deletedRowsCount = deletedRowsCount;
+ this.partitions = partitions;
+ }
+
+ public String manifestPath() {
+ return manifestPath;
+ }
+
+ public long manifestLength() {
+ return manifestLength;
+ }
+
+ public int partitionSpecId() {
+ return partitionSpecId;
+ }
+
+ public Content content() {
+ return content;
+ }
+
+ public long sequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public long minSequenceNumber() {
+ return minSequenceNumber;
+ }
+
+ public long addedSnapshotId() {
+ return addedSnapshotId;
+ }
+
+ public int addedFilesCount() {
+ return addedFilesCount;
+ }
+
+ public int existingFilesCount() {
+ return existingFilesCount;
+ }
+
+ public int deletedFilesCount() {
+ return deletedFilesCount;
+ }
+
+ public long addedRowsCount() {
+ return addedRowsCount;
+ }
+
+ public long existingRowsCount() {
+ return existingRowsCount;
+ }
+
+ public long deletedRowsCount() {
+ return deletedRowsCount;
+ }
+
+ public List<IcebergPartitionSummary> partitions() {
+ return partitions;
+ }
+
+ public static RowType schema() {
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(500, "manifest_path",
DataTypes.STRING().notNull()));
+ fields.add(new DataField(501, "manifest_length",
DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(502, "partition_spec_id",
DataTypes.INT().notNull()));
+ fields.add(new DataField(517, "content", DataTypes.INT().notNull()));
+ fields.add(new DataField(515, "sequence_number",
DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(516, "min_sequence_number",
DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(503, "added_snapshot_id",
DataTypes.BIGINT()));
+ fields.add(new DataField(504, "added_files_count",
DataTypes.INT().notNull()));
+ fields.add(new DataField(505, "existing_files_count",
DataTypes.INT().notNull()));
+ fields.add(new DataField(506, "deleted_files_count",
DataTypes.INT().notNull()));
+ fields.add(new DataField(512, "added_rows_count",
DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(513, "existing_rows_count",
DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(514, "deleted_rows_count",
DataTypes.BIGINT().notNull()));
+ fields.add(
+ new DataField(
+ 508, "partitions",
DataTypes.ARRAY(IcebergPartitionSummary.schema())));
+ return new RowType(fields);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IcebergManifestFileMeta that = (IcebergManifestFileMeta) o;
+ return Objects.equals(manifestPath, that.manifestPath)
+ && manifestLength == that.manifestLength
+ && partitionSpecId == that.partitionSpecId
+ && content == that.content
+ && sequenceNumber == that.sequenceNumber
+ && minSequenceNumber == that.minSequenceNumber
+ && addedSnapshotId == that.addedSnapshotId
+ && addedFilesCount == that.addedFilesCount
+ && existingFilesCount == that.existingFilesCount
+ && deletedFilesCount == that.deletedFilesCount
+ && addedRowsCount == that.addedRowsCount
+ && existingRowsCount == that.existingRowsCount
+ && deletedRowsCount == that.deletedRowsCount
+ && Objects.equals(partitions, that.partitions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ manifestPath,
+ manifestLength,
+ partitionSpecId,
+ content,
+ sequenceNumber,
+ minSequenceNumber,
+ addedSnapshotId,
+ addedFilesCount,
+ existingFilesCount,
+ deletedFilesCount,
+ addedRowsCount,
+ existingRowsCount,
+ deletedRowsCount,
+ partitions);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java
new file mode 100644
index 000000000..c40a26e8f
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta.Content;
+import org.apache.paimon.utils.ObjectSerializer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Serializer for {@link IcebergManifestFileMeta}. */
+public class IcebergManifestFileMetaSerializer extends
ObjectSerializer<IcebergManifestFileMeta> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final IcebergPartitionSummarySerializer partitionSummarySerializer;
+
+ public IcebergManifestFileMetaSerializer() {
+ super(IcebergManifestFileMeta.schema());
+ this.partitionSummarySerializer = new
IcebergPartitionSummarySerializer();
+ }
+
+ @Override
+ public InternalRow toRow(IcebergManifestFileMeta file) {
+ return GenericRow.of(
+ BinaryString.fromString(file.manifestPath()),
+ file.manifestLength(),
+ file.partitionSpecId(),
+ file.content().id(),
+ file.sequenceNumber(),
+ file.minSequenceNumber(),
+ file.addedSnapshotId(),
+ file.addedFilesCount(),
+ file.existingFilesCount(),
+ file.deletedFilesCount(),
+ file.addedRowsCount(),
+ file.existingRowsCount(),
+ file.deletedRowsCount(),
+ new GenericArray(
+ file.partitions().stream()
+ .map(partitionSummarySerializer::toRow)
+ .toArray(InternalRow[]::new)));
+ }
+
+ @Override
+ public IcebergManifestFileMeta fromRow(InternalRow row) {
+ return new IcebergManifestFileMeta(
+ row.getString(0).toString(),
+ row.getLong(1),
+ row.getInt(2),
+ Content.fromId(row.getInt(3)),
+ row.getLong(4),
+ row.getLong(5),
+ row.getLong(6),
+ row.getInt(7),
+ row.getInt(8),
+ row.getInt(9),
+ row.getLong(10),
+ row.getLong(11),
+ row.getLong(12),
+ toPartitionSummaries(row.getArray(13)));
+ }
+
+ private List<IcebergPartitionSummary> toPartitionSummaries(InternalArray
array) {
+ List<IcebergPartitionSummary> summaries = new ArrayList<>();
+ for (int i = 0; i < array.size(); i++) {
+ summaries.add(
+ partitionSummarySerializer.fromRow(
+ array.getRow(i,
partitionSummarySerializer.numFields())));
+ }
+ return summaries;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
new file mode 100644
index 000000000..e247b0238
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.utils.ObjectsFile;
+import org.apache.paimon.utils.PathFactory;
+
+/**
+ * This file includes several Iceberg {@link IcebergManifestFileMeta}s,
representing the additional
+ * changes since last snapshot.
+ */
+public class IcebergManifestList extends ObjectsFile<IcebergManifestFileMeta> {
+
+ public IcebergManifestList(
+ FileIO fileIO,
+ FormatReaderFactory readerFactory,
+ FormatWriterFactory writerFactory,
+ String compression,
+ PathFactory pathFactory) {
+ super(
+ fileIO,
+ new IcebergManifestFileMetaSerializer(),
+ readerFactory,
+ writerFactory,
+ compression,
+ pathFactory,
+ null);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummary.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummary.java
new file mode 100644
index 000000000..e47a8fe00
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummary.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Iceberg partition summary stored in manifest file.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#manifest-lists">Iceberg
spec</a>.
+ */
+public class IcebergPartitionSummary {
+
+ private final boolean containsNull;
+ private final boolean containsNan;
+ private final byte[] lowerBound;
+ private final byte[] upperBound;
+
+ public IcebergPartitionSummary(
+ boolean containsNull, boolean containsNan, byte[] lowerBound,
byte[] upperBound) {
+ this.containsNull = containsNull;
+ this.containsNan = containsNan;
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ }
+
+ public boolean containsNull() {
+ return containsNull;
+ }
+
+ public boolean containsNan() {
+ return containsNan;
+ }
+
+ public byte[] lowerBound() {
+ return lowerBound;
+ }
+
+ public byte[] upperBound() {
+ return upperBound;
+ }
+
+ public static RowType schema() {
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(509, "contains_null",
DataTypes.BOOLEAN().notNull()));
+ fields.add(new DataField(518, "contains_nan", DataTypes.BOOLEAN()));
+ fields.add(new DataField(510, "lower_bound", DataTypes.BYTES()));
+ fields.add(new DataField(511, "upper_bound", DataTypes.BYTES()));
+ return (RowType) new RowType(fields).notNull();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IcebergPartitionSummary that = (IcebergPartitionSummary) o;
+ return containsNull == that.containsNull
+ && containsNan == that.containsNan
+ && Arrays.equals(lowerBound, that.lowerBound)
+ && Arrays.equals(upperBound, that.upperBound);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(containsNull, containsNan);
+ result = 31 * result + Arrays.hashCode(lowerBound);
+ result = 31 * result + Arrays.hashCode(upperBound);
+ return result;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummarySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummarySerializer.java
new file mode 100644
index 000000000..2ef6ceb1c
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummarySerializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.ObjectSerializer;
+
+/** Serializer for {@link IcebergPartitionSummary}. */
+public class IcebergPartitionSummarySerializer extends
ObjectSerializer<IcebergPartitionSummary> {
+
+ public IcebergPartitionSummarySerializer() {
+ super(IcebergPartitionSummary.schema());
+ }
+
+ @Override
+ public InternalRow toRow(IcebergPartitionSummary record) {
+ return GenericRow.of(
+ record.containsNull(),
+ record.containsNan(),
+ record.lowerBound(),
+ record.upperBound());
+ }
+
+ @Override
+ public IcebergPartitionSummary fromRow(InternalRow row) {
+ return new IcebergPartitionSummary(
+ row.getBoolean(0), row.getBoolean(1), row.getBinary(2),
row.getBinary(3));
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
new file mode 100644
index 000000000..fd05183b6
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DecimalType;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * {@link DataField} in Iceberg.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#schemas">Iceberg spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergDataField {
+
+ private static final String FIELD_ID = "id";
+ private static final String FIELD_NAME = "name";
+ private static final String FIELD_REQUIRED = "required";
+ private static final String FIELD_TYPE = "type";
+ private static final String FIELD_DOC = "doc";
+
+ @JsonProperty(FIELD_ID)
+ private final int id;
+
+ @JsonProperty(FIELD_NAME)
+ private final String name;
+
+ @JsonProperty(FIELD_REQUIRED)
+ private final boolean required;
+
+ @JsonProperty(FIELD_TYPE)
+ private final String type;
+
+ @JsonProperty(FIELD_DOC)
+ private final String doc;
+
+ public IcebergDataField(DataField dataField) {
+ this(
+ dataField.id(),
+ dataField.name(),
+ !dataField.type().isNullable(),
+ toTypeString(dataField.type()),
+ dataField.description());
+ }
+
+ @JsonCreator
+ public IcebergDataField(
+ @JsonProperty(FIELD_ID) int id,
+ @JsonProperty(FIELD_NAME) String name,
+ @JsonProperty(FIELD_REQUIRED) boolean required,
+ @JsonProperty(FIELD_TYPE) String type,
+ @JsonProperty(FIELD_DOC) String doc) {
+ this.id = id;
+ this.name = name;
+ this.required = required;
+ this.type = type;
+ this.doc = doc;
+ }
+
+ @JsonGetter(FIELD_ID)
+ public int id() {
+ return id;
+ }
+
+ @JsonGetter(FIELD_NAME)
+ public String name() {
+ return name;
+ }
+
+ @JsonGetter(FIELD_REQUIRED)
+ public boolean required() {
+ return required;
+ }
+
+ @JsonGetter(FIELD_TYPE)
+ public String type() {
+ return type;
+ }
+
+ @JsonGetter(FIELD_DOC)
+ public String doc() {
+ return doc;
+ }
+
+ private static String toTypeString(DataType dataType) {
+ switch (dataType.getTypeRoot()) {
+ case BOOLEAN:
+ return "boolean";
+ case INTEGER:
+ return "int";
+ case BIGINT:
+ return "long";
+ case FLOAT:
+ return "float";
+ case DOUBLE:
+ return "double";
+ case DATE:
+ return "date";
+ case CHAR:
+ case VARCHAR:
+ return "string";
+ case BINARY:
+ case VARBINARY:
+ return "binary";
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) dataType;
+ return String.format(
+ "decimal(%d, %d)", decimalType.getPrecision(),
decimalType.getScale());
+ default:
+ throw new UnsupportedOperationException("Unsupported data
type: " + dataType);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, name, required, type, doc);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IcebergDataField)) {
+ return false;
+ }
+
+ IcebergDataField that = (IcebergDataField) o;
+ return id == that.id
+ && Objects.equals(name, that.name)
+ && required == that.required
+ && Objects.equals(type, that.type)
+ && Objects.equals(doc, that.doc);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
new file mode 100644
index 000000000..a3af25a1c
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Iceberg's metadata json file.
+ *
+ * <p>See <a
href="https://iceberg.apache.org/spec/#table-metadata-fields">Iceberg spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergMetadata {
+
+ public static final int CURRENT_FORMAT_VERSION = 2;
+
+ private static final String FIELD_FORMAT_VERSION = "format-version";
+ private static final String FIELD_TABLE_UUID = "table-uuid";
+ private static final String FIELD_LOCATION = "location";
+ private static final String FIELD_LAST_SEQUENCE_NUMBER =
"last-sequence-number";
+ private static final String FIELD_LAST_UPDATED_MS = "last-updated-ms";
+ private static final String FIELD_LAST_COLUMN_ID = "last-column-id";
+ private static final String FIELD_SCHEMAS = "schemas";
+ private static final String FIELD_CURRENT_SCHEMA_ID = "current-schema-id";
+ private static final String FIELD_PARTITION_SPECS = "partition-specs";
+ private static final String FIELD_DEFAULT_SPEC_ID = "default-spec-id";
+ private static final String FIELD_LAST_PARTITION_ID = "last-partition-id";
+ private static final String FIELD_SORT_ORDERS = "sort-orders";
+ private static final String FIELD_DEFAULT_SORT_ORDER_ID =
"default-sort-order-id";
+ private static final String FIELD_SNAPSHOTS = "snapshots";
+ private static final String FIELD_CURRENT_SNAPSHOT_ID =
"current-snapshot-id";
+
+ @JsonProperty(FIELD_FORMAT_VERSION)
+ private final int formatVersion;
+
+ @JsonProperty(FIELD_TABLE_UUID)
+ private final String tableUuid;
+
+ @JsonProperty(FIELD_LOCATION)
+ private final String location;
+
+ @JsonProperty(FIELD_LAST_SEQUENCE_NUMBER)
+ private final long lastSequenceNumber;
+
+ @JsonProperty(FIELD_LAST_UPDATED_MS)
+ private final long lastUpdatedMs;
+
+ @JsonProperty(FIELD_LAST_COLUMN_ID)
+ private final int lastColumnId;
+
+ @JsonProperty(FIELD_SCHEMAS)
+ private final List<IcebergSchema> schemas;
+
+ @JsonProperty(FIELD_CURRENT_SCHEMA_ID)
+ private final int currentSchemaId;
+
+ @JsonProperty(FIELD_PARTITION_SPECS)
+ private final List<IcebergPartitionSpec> partitionSpecs;
+
+ @JsonProperty(FIELD_DEFAULT_SPEC_ID)
+ private final int defaultSpecId;
+
+ @JsonProperty(FIELD_LAST_PARTITION_ID)
+ private final int lastPartitionId;
+
+ @JsonProperty(FIELD_SORT_ORDERS)
+ private final List<IcebergSortOrder> sortOrders;
+
+ @JsonProperty(FIELD_DEFAULT_SORT_ORDER_ID)
+ private final int defaultSortOrderId;
+
+ @JsonProperty(FIELD_SNAPSHOTS)
+ private final List<IcebergSnapshot> snapshots;
+
+ @JsonProperty(FIELD_CURRENT_SNAPSHOT_ID)
+ private final int currentSnapshotId;
+
+ public IcebergMetadata(
+ String tableUuid,
+ String location,
+ long lastSequenceNumber,
+ int lastColumnId,
+ List<IcebergSchema> schemas,
+ int currentSchemaId,
+ List<IcebergPartitionSpec> partitionSpecs,
+ int lastPartitionId,
+ List<IcebergSnapshot> snapshots,
+ int currentSnapshotId) {
+ this(
+ CURRENT_FORMAT_VERSION,
+ tableUuid,
+ location,
+ lastSequenceNumber,
+ System.currentTimeMillis(),
+ lastColumnId,
+ schemas,
+ currentSchemaId,
+ partitionSpecs,
+ IcebergPartitionSpec.SPEC_ID,
+ lastPartitionId,
+ Collections.singletonList(new IcebergSortOrder()),
+ IcebergSortOrder.ORDER_ID,
+ snapshots,
+ currentSnapshotId);
+ }
+
+ @JsonCreator
+ public IcebergMetadata(
+ @JsonProperty(FIELD_FORMAT_VERSION) int formatVersion,
+ @JsonProperty(FIELD_TABLE_UUID) String tableUuid,
+ @JsonProperty(FIELD_LOCATION) String location,
+ @JsonProperty(FIELD_LAST_SEQUENCE_NUMBER) long lastSequenceNumber,
+ @JsonProperty(FIELD_LAST_UPDATED_MS) long lastUpdatedMs,
+ @JsonProperty(FIELD_LAST_COLUMN_ID) int lastColumnId,
+ @JsonProperty(FIELD_SCHEMAS) List<IcebergSchema> schemas,
+ @JsonProperty(FIELD_CURRENT_SCHEMA_ID) int currentSchemaId,
+ @JsonProperty(FIELD_PARTITION_SPECS) List<IcebergPartitionSpec>
partitionSpecs,
+ @JsonProperty(FIELD_DEFAULT_SPEC_ID) int defaultSpecId,
+ @JsonProperty(FIELD_LAST_PARTITION_ID) int lastPartitionId,
+ @JsonProperty(FIELD_SORT_ORDERS) List<IcebergSortOrder> sortOrders,
+ @JsonProperty(FIELD_DEFAULT_SORT_ORDER_ID) int defaultSortOrderId,
+ @JsonProperty(FIELD_SNAPSHOTS) List<IcebergSnapshot> snapshots,
+ @JsonProperty(FIELD_CURRENT_SNAPSHOT_ID) int currentSnapshotId) {
+ this.formatVersion = formatVersion;
+ this.tableUuid = tableUuid;
+ this.location = location;
+ this.lastSequenceNumber = lastSequenceNumber;
+ this.lastUpdatedMs = lastUpdatedMs;
+ this.lastColumnId = lastColumnId;
+ this.schemas = schemas;
+ this.currentSchemaId = currentSchemaId;
+ this.partitionSpecs = partitionSpecs;
+ this.defaultSpecId = defaultSpecId;
+ this.lastPartitionId = lastPartitionId;
+ this.sortOrders = sortOrders;
+ this.defaultSortOrderId = defaultSortOrderId;
+ this.snapshots = snapshots;
+ this.currentSnapshotId = currentSnapshotId;
+ }
+
+ @JsonGetter(FIELD_FORMAT_VERSION)
+ public int formatVersion() {
+ return formatVersion;
+ }
+
+ @JsonGetter(FIELD_TABLE_UUID)
+ public String tableUuid() {
+ return tableUuid;
+ }
+
+ @JsonGetter(FIELD_LOCATION)
+ public String location() {
+ return location;
+ }
+
+ @JsonGetter(FIELD_LAST_SEQUENCE_NUMBER)
+ public long lastSequenceNumber() {
+ return lastSequenceNumber;
+ }
+
+ @JsonGetter(FIELD_LAST_UPDATED_MS)
+ public long lastUpdatedMs() {
+ return lastUpdatedMs;
+ }
+
+ @JsonGetter(FIELD_LAST_COLUMN_ID)
+ public int lastColumnId() {
+ return lastColumnId;
+ }
+
+ @JsonGetter(FIELD_SCHEMAS)
+ public List<IcebergSchema> schemas() {
+ return schemas;
+ }
+
+ @JsonGetter(FIELD_CURRENT_SCHEMA_ID)
+ public int currentSchemaId() {
+ return currentSchemaId;
+ }
+
+ @JsonGetter(FIELD_PARTITION_SPECS)
+ public List<IcebergPartitionSpec> partitionSpecs() {
+ return partitionSpecs;
+ }
+
+ @JsonGetter(FIELD_DEFAULT_SPEC_ID)
+ public int defaultSpecId() {
+ return defaultSpecId;
+ }
+
+ @JsonGetter(FIELD_LAST_PARTITION_ID)
+ public int lastPartitionId() {
+ return lastPartitionId;
+ }
+
+ @JsonGetter(FIELD_SORT_ORDERS)
+ public List<IcebergSortOrder> sortOrders() {
+ return sortOrders;
+ }
+
+ @JsonGetter(FIELD_DEFAULT_SORT_ORDER_ID)
+ public int defaultSortOrderId() {
+ return defaultSortOrderId;
+ }
+
+ @JsonGetter(FIELD_SNAPSHOTS)
+ public List<IcebergSnapshot> snapshots() {
+ return snapshots;
+ }
+
+ @JsonGetter(FIELD_CURRENT_SNAPSHOT_ID)
+ public int currentSnapshotId() {
+ return currentSnapshotId;
+ }
+
+ public IcebergSnapshot currentSnapshot() {
+ for (IcebergSnapshot snapshot : snapshots) {
+ if (snapshot.snapshotId() == currentSnapshotId) {
+ return snapshot;
+ }
+ }
+ throw new RuntimeException(
+ "Cannot find snapshot with id " + currentSnapshotId + ", this
is unexpected.");
+ }
+
+ public String toJson() {
+ return JsonSerdeUtil.toJson(this);
+ }
+
+ public static IcebergMetadata fromJson(String json) {
+ return JsonSerdeUtil.fromJson(json, IcebergMetadata.class);
+ }
+
+ public static IcebergMetadata fromPath(FileIO fileIO, Path path) {
+ try {
+ String json = fileIO.readFileUtf8(path);
+ return fromJson(json);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read Iceberg metadata from
path " + path, e);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ formatVersion,
+ tableUuid,
+ location,
+ lastSequenceNumber,
+ lastUpdatedMs,
+ lastColumnId,
+ schemas,
+ currentSchemaId,
+ partitionSpecs,
+ defaultSpecId,
+ lastPartitionId,
+ sortOrders,
+ defaultSortOrderId,
+ snapshots,
+ currentSnapshotId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IcebergMetadata)) {
+ return false;
+ }
+
+ IcebergMetadata that = (IcebergMetadata) o;
+ return formatVersion == that.formatVersion
+ && Objects.equals(tableUuid, that.tableUuid)
+ && Objects.equals(location, that.location)
+ && lastSequenceNumber == that.lastSequenceNumber
+ && lastUpdatedMs == that.lastUpdatedMs
+ && lastColumnId == that.lastColumnId
+ && Objects.equals(schemas, that.schemas)
+ && currentSchemaId == that.currentSchemaId
+ && Objects.equals(partitionSpecs, that.partitionSpecs)
+ && defaultSpecId == that.defaultSpecId
+ && lastPartitionId == that.lastPartitionId
+ && Objects.equals(sortOrders, that.sortOrders)
+ && defaultSortOrderId == that.defaultSortOrderId
+ && Objects.equals(snapshots, that.snapshots)
+ && currentSnapshotId == that.currentSnapshotId;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java
new file mode 100644
index 000000000..7be0d0493
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import org.apache.paimon.types.DataField;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Partition field in Iceberg's partition spec.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#partition-specs">Iceberg
spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergPartitionField {
+
+ // not sure why, but the sample in Iceberg spec is like this
+ public static final int FIRST_FIELD_ID = 1000;
+
+ private static final String FIELD_NAME = "name";
+ private static final String FIELD_TRANSFORM = "transform";
+ private static final String FIELD_SOURCE_ID = "source-id";
+ private static final String FIELD_FIELD_ID = "field-id";
+
+ @JsonProperty(FIELD_NAME)
+ private final String name;
+
+ @JsonProperty(FIELD_TRANSFORM)
+ private final String transform;
+
+ @JsonProperty(FIELD_SOURCE_ID)
+ private final int sourceId;
+
+ @JsonProperty(FIELD_FIELD_ID)
+ private final int fieldId;
+
+ public IcebergPartitionField(DataField dataField, int fieldId) {
+ this(
+ dataField.name(),
+ // currently Paimon's partition value does not have any
transformation
+ "identity",
+ dataField.id(),
+ fieldId);
+ }
+
+ @JsonCreator
+ public IcebergPartitionField(
+ @JsonProperty(FIELD_NAME) String name,
+ @JsonProperty(FIELD_TRANSFORM) String transform,
+ @JsonProperty(FIELD_SOURCE_ID) int sourceId,
+ @JsonProperty(FIELD_FIELD_ID) int fieldId) {
+ this.name = name;
+ this.transform = transform;
+ this.sourceId = sourceId;
+ this.fieldId = fieldId;
+ }
+
+ @JsonGetter(FIELD_NAME)
+ public String name() {
+ return name;
+ }
+
+ @JsonGetter(FIELD_TRANSFORM)
+ public String transform() {
+ return transform;
+ }
+
+ @JsonGetter(FIELD_SOURCE_ID)
+ public int sourceId() {
+ return sourceId;
+ }
+
+ @JsonGetter(FIELD_FIELD_ID)
+ public int fieldId() {
+ return fieldId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, transform, sourceId, fieldId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IcebergPartitionField)) {
+ return false;
+ }
+
+ IcebergPartitionField that = (IcebergPartitionField) o;
+ return Objects.equals(name, that.name)
+ && Objects.equals(transform, that.transform)
+ && sourceId == that.sourceId
+ && fieldId == that.fieldId;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionSpec.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionSpec.java
new file mode 100644
index 000000000..343a8c769
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionSpec.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Partition spec in Iceberg's metadata.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#partition-specs">Iceberg
spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergPartitionSpec {
+
+ // always 0, Paimon does not support partition evolution
+ public static final int SPEC_ID = 0;
+
+ private static final String FIELD_SPEC_ID = "spec-id";
+ private static final String FIELD_FIELDS = "fields";
+
+ @JsonProperty(FIELD_SPEC_ID)
+ private final int specId;
+
+ @JsonProperty(FIELD_FIELDS)
+ private final List<IcebergPartitionField> fields;
+
+ public IcebergPartitionSpec(List<IcebergPartitionField> fields) {
+ this(SPEC_ID, fields);
+ }
+
+ @JsonCreator
+ public IcebergPartitionSpec(
+ @JsonProperty(FIELD_SPEC_ID) int specId,
+ @JsonProperty(FIELD_FIELDS) List<IcebergPartitionField> fields) {
+ this.specId = specId;
+ this.fields = fields;
+ }
+
+ @JsonGetter(FIELD_SPEC_ID)
+ public int specId() {
+ return specId;
+ }
+
+ @JsonGetter(FIELD_FIELDS)
+ public List<IcebergPartitionField> fields() {
+ return fields;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(specId, fields);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IcebergPartitionSpec)) {
+ return false;
+ }
+
+ IcebergPartitionSpec that = (IcebergPartitionSpec) o;
+ return specId == that.specId && Objects.equals(fields, that.fields);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java
new file mode 100644
index 000000000..b3c82021e
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import org.apache.paimon.schema.TableSchema;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Schema in Iceberg's metadata.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#schemas">Iceberg spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergSchema {
+
+ private static final String FIELD_TYPE = "type";
+ private static final String FIELD_SCHEMA_ID = "schema-id";
+ private static final String FIELD_FIELDS = "fields";
+
+ @JsonProperty(FIELD_TYPE)
+ private final String type;
+
+ @JsonProperty(FIELD_SCHEMA_ID)
+ private final int schemaId;
+
+ @JsonProperty(FIELD_FIELDS)
+ private final List<IcebergDataField> fields;
+
+ public IcebergSchema(TableSchema tableSchema) {
+ this(
+ (int) tableSchema.id(),
+ tableSchema.fields().stream()
+ .map(IcebergDataField::new)
+ .collect(Collectors.toList()));
+ }
+
+ public IcebergSchema(int schemaId, List<IcebergDataField> fields) {
+ this("struct", schemaId, fields);
+ }
+
+ @JsonCreator
+ public IcebergSchema(
+ @JsonProperty(FIELD_TYPE) String type,
+ @JsonProperty(FIELD_SCHEMA_ID) int schemaId,
+ @JsonProperty(FIELD_FIELDS) List<IcebergDataField> fields) {
+ this.type = type;
+ this.schemaId = schemaId;
+ this.fields = fields;
+ }
+
+ @JsonGetter(FIELD_TYPE)
+ public String type() {
+ return type;
+ }
+
+ @JsonGetter(FIELD_SCHEMA_ID)
+ public int schemaId() {
+ return schemaId;
+ }
+
+ @JsonGetter(FIELD_FIELDS)
+ public List<IcebergDataField> fields() {
+ return fields;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, schemaId, fields);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IcebergSchema)) {
+ return false;
+ }
+
+ IcebergSchema that = (IcebergSchema) o;
+ return Objects.equals(type, that.type)
+ && schemaId == that.schemaId
+ && Objects.equals(fields, that.fields);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java
new file mode 100644
index 000000000..df0224d22
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Snapshot in Iceberg's metadata.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#snapshots">Iceberg
spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergSnapshot {
+
+ private static final String FIELD_SEQUENCE_NUMBER = "sequence-number";
+ private static final String FIELD_SNAPSHOT_ID = "snapshot-id";
+ private static final String FIELD_TIMESTAMP_MS = "timestamp-ms";
+ private static final String FIELD_SUMMARY = "summary";
+ private static final String FIELD_MANIFEST_LIST = "manifest-list";
+ private static final String FIELD_SCHEMA_ID = "schema-id";
+
+ @JsonProperty(FIELD_SEQUENCE_NUMBER)
+ private final long sequenceNumber;
+
+ @JsonProperty(FIELD_SNAPSHOT_ID)
+ private final long snapshotId;
+
+ @JsonProperty(FIELD_TIMESTAMP_MS)
+ private final long timestampMs;
+
+ @JsonProperty(FIELD_SUMMARY)
+ private final IcebergSnapshotSummary summary;
+
+ @JsonProperty(FIELD_MANIFEST_LIST)
+ private final String manifestList;
+
+ @JsonProperty(FIELD_SCHEMA_ID)
+ private final int schemaId;
+
+ @JsonCreator
+ public IcebergSnapshot(
+ @JsonProperty(FIELD_SEQUENCE_NUMBER) long sequenceNumber,
+ @JsonProperty(FIELD_SNAPSHOT_ID) long snapshotId,
+ @JsonProperty(FIELD_TIMESTAMP_MS) long timestampMs,
+ @JsonProperty(FIELD_SUMMARY) IcebergSnapshotSummary summary,
+ @JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
+ @JsonProperty(FIELD_SCHEMA_ID) int schemaId) {
+ this.sequenceNumber = sequenceNumber;
+ this.snapshotId = snapshotId;
+ this.timestampMs = timestampMs;
+ this.summary = summary;
+ this.manifestList = manifestList;
+ this.schemaId = schemaId;
+ }
+
+ @JsonGetter(FIELD_SEQUENCE_NUMBER)
+ public long sequenceNumber() {
+ return sequenceNumber;
+ }
+
+ @JsonGetter(FIELD_SNAPSHOT_ID)
+ public long snapshotId() {
+ return snapshotId;
+ }
+
+ @JsonGetter(FIELD_TIMESTAMP_MS)
+ public long timestampMs() {
+ return timestampMs;
+ }
+
+ @JsonGetter(FIELD_SUMMARY)
+ public IcebergSnapshotSummary summary() {
+ return summary;
+ }
+
+ @JsonGetter(FIELD_MANIFEST_LIST)
+ public String manifestList() {
+ return manifestList;
+ }
+
+ @JsonGetter(FIELD_SCHEMA_ID)
+ public int schemaId() {
+ return schemaId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ sequenceNumber, snapshotId, timestampMs, summary,
manifestList, schemaId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IcebergSnapshot)) {
+ return false;
+ }
+
+ IcebergSnapshot that = (IcebergSnapshot) o;
+ return sequenceNumber == that.sequenceNumber
+ && snapshotId == that.snapshotId
+ && timestampMs == that.timestampMs
+ && Objects.equals(summary, that.summary)
+ && Objects.equals(manifestList, that.manifestList)
+ && schemaId == that.schemaId;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java
new file mode 100644
index 000000000..0c70331ee
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Snapshot summary in Iceberg's snapshot.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#snapshots">Iceberg
spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergSnapshotSummary {
+
+ private static final String FIELD_OPERATION = "operation";
+
+ public static final String OPERATION_APPEND = "append";
+ public static final String OPERATION_OVERWRITE = "overwrite";
+
+ @JsonProperty(FIELD_OPERATION)
+ private final String operation;
+
+ @JsonCreator
+ public IcebergSnapshotSummary(@JsonProperty(FIELD_OPERATION) String
operation) {
+ this.operation = operation;
+ }
+
+ @JsonGetter(FIELD_OPERATION)
+ public String operation() {
+ return operation;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(operation);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IcebergSnapshotSummary)) {
+ return false;
+ }
+
+ IcebergSnapshotSummary that = (IcebergSnapshotSummary) o;
+ return Objects.equals(operation, that.operation);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSortOrder.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSortOrder.java
new file mode 100644
index 000000000..0ff867006
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSortOrder.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Sort order in Iceberg's metadata.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#sort-orders">Iceberg
spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergSortOrder {
+
+ // currently unsupported
+ public static final int ORDER_ID = 0;
+
+ private static final String FIELD_ORDER_ID = "order-id";
+ private static final String FIELD_FIELDS = "fields";
+
+ @JsonProperty(FIELD_ORDER_ID)
+ private final int orderId;
+
+ // currently always empty
+ @JsonProperty(FIELD_FIELDS)
+ private final List<Object> fields;
+
+ public IcebergSortOrder() {
+ this(ORDER_ID, new ArrayList<>());
+ }
+
+ @JsonCreator
+ public IcebergSortOrder(
+ @JsonProperty(FIELD_ORDER_ID) int orderId,
+ @JsonProperty(FIELD_FIELDS) List<Object> fields) {
+ this.orderId = orderId;
+ this.fields = fields;
+ }
+
+ @JsonGetter(FIELD_ORDER_ID)
+ public int orderId() {
+ return orderId;
+ }
+
+ @JsonGetter(FIELD_FIELDS)
+ public List<Object> fields() {
+ return fields;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(orderId, fields);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IcebergSortOrder)) {
+ return false;
+ }
+
+ IcebergSortOrder that = (IcebergSortOrder) o;
+ return orderId == that.orderId && Objects.equals(fields, that.fields);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 108454bb7..3839896eb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.IcebergCommitCallback;
import org.apache.paimon.metastore.AddPartitionCommitCallback;
import org.apache.paimon.metastore.AddPartitionTagCallback;
import org.apache.paimon.metastore.MetastoreClient;
@@ -351,7 +352,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
return new TableCommitImpl(
store().newCommit(commitUser),
- createCommitCallbacks(),
+ createCommitCallbacks(commitUser),
snapshotExpire,
options.writeOnly() ? null :
store().newPartitionExpire(commitUser),
options.writeOnly() ? null : store().newTagCreationManager(),
@@ -363,17 +364,19 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
coreOptions().forceCreatingSnapshot());
}
- private List<CommitCallback> createCommitCallbacks() {
+ private List<CommitCallback> createCommitCallbacks(String commitUser) {
List<CommitCallback> callbacks =
new
ArrayList<>(CallbackUtils.loadCommitCallbacks(coreOptions()));
CoreOptions options = coreOptions();
MetastoreClient.Factory metastoreClientFactory =
catalogEnvironment.metastoreClientFactory();
+
if (options.partitionedTableInMetastore()
&& metastoreClientFactory != null
&& tableSchema.partitionKeys().size() > 0) {
callbacks.add(new
AddPartitionCommitCallback(metastoreClientFactory.create()));
}
+
TagPreview tagPreview = TagPreview.create(options);
if (options.tagToPartitionField() != null
&& tagPreview != null
@@ -386,6 +389,11 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
tagPreview);
callbacks.add(callback);
}
+
+ if (options.metadataIcebergCompatible()) {
+ callbacks.add(new IcebergCommitCallback(this, commitUser));
+ }
+
return callbacks;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
new file mode 100644
index 000000000..25700705d
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.CloseableIterable;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for Iceberg compatibility. */
+public class IcebergCompatibilityTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testUnpartitionedPrimaryKeyTable() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.STRING(),
DataTypes.INT(), DataTypes.BIGINT()
+ },
+ new String[] {"k1", "k2", "v1", "v2"});
+
+ int numRecords = 1000;
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ List<TestRecord> testRecords = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ int k1 = random.nextInt(0, 100);
+ String k2 = String.valueOf(random.nextInt(1000, 1010));
+ int v1 = random.nextInt();
+ long v2 = random.nextLong();
+ testRecords.add(
+ new TestRecord(
+ BinaryRow.EMPTY_ROW,
+ String.format("%d|%s", k1, k2),
+ String.format("%d|%d", v1, v2),
+ GenericRow.of(k1, BinaryString.fromString(k2), v1,
v2)));
+ }
+
+ runCompatibilityTest(
+ rowType,
+ Collections.emptyList(),
+ Arrays.asList("k1", "k2"),
+ testRecords,
+ r -> String.format("%d|%s", r.get(0, Integer.class), r.get(1,
String.class)),
+ r -> String.format("%d|%d", r.get(2, Integer.class), r.get(3,
Long.class)));
+ }
+
+ @Test
+ public void testPartitionedPrimaryKeyTable() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.BIGINT()
+ },
+ new String[] {"pt1", "pt2", "k", "v1", "v2"});
+
+ BiFunction<Integer, String, BinaryRow> binaryRow =
+ (pt1, pt2) -> {
+ BinaryRow b = new BinaryRow(2);
+ BinaryRowWriter writer = new BinaryRowWriter(b);
+ writer.writeInt(0, pt1);
+ writer.writeString(1, BinaryString.fromString(pt2));
+ writer.complete();
+ return b;
+ };
+
+ int numRecords = 1000;
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ List<TestRecord> testRecords = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ int pt1 = random.nextInt(0, 2);
+ String pt2 = String.valueOf(random.nextInt(10, 12));
+ String k = String.valueOf(random.nextInt(0, 100));
+ int v1 = random.nextInt();
+ long v2 = random.nextLong();
+ testRecords.add(
+ new TestRecord(
+ binaryRow.apply(pt1, pt2),
+ String.format("%d|%s|%s", pt1, pt2, k),
+ String.format("%d|%d", v1, v2),
+ GenericRow.of(
+ pt1,
+ BinaryString.fromString(pt2),
+ BinaryString.fromString(k),
+ v1,
+ v2)));
+ }
+
+ runCompatibilityTest(
+ rowType,
+ Arrays.asList("pt1", "pt2"),
+ Arrays.asList("pt1", "pt2", "k"),
+ testRecords,
+ r ->
+ String.format(
+ "%d|%s|%s",
+ r.get(0, Integer.class),
+ r.get(1, String.class),
+ r.get(2, String.class)),
+ r -> String.format("%d|%d", r.get(3, Integer.class), r.get(4,
Long.class)));
+ }
+
+ @Test
+ public void testAppendOnlyTableWithAllTypes() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.BOOLEAN(),
+ DataTypes.BIGINT(),
+ DataTypes.FLOAT(),
+ DataTypes.DOUBLE(),
+ DataTypes.DECIMAL(8, 3),
+ DataTypes.CHAR(20),
+ DataTypes.STRING(),
+ DataTypes.BINARY(20),
+ DataTypes.VARBINARY(20),
+ DataTypes.DATE()
+ },
+ new String[] {
+ "pt",
+ "v_boolean",
+ "v_bigint",
+ "v_float",
+ "v_double",
+ "v_decimal",
+ "v_char",
+ "v_varchar",
+ "v_binary",
+ "v_varbinary",
+ "v_date"
+ });
+
+ Function<Integer, BinaryRow> binaryRow =
+ (pt) -> {
+ BinaryRow b = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(b);
+ writer.writeInt(0, pt);
+ writer.complete();
+ return b;
+ };
+
+ int numRecords = 1000;
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ List<TestRecord> testRecords = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ int pt = random.nextInt(0, 2);
+ boolean vBoolean = random.nextBoolean();
+ long vBigInt = random.nextLong();
+ float vFloat = random.nextFloat();
+ double vDouble = random.nextDouble();
+ Decimal vDecimal = Decimal.fromUnscaledLong(random.nextLong(0,
100000000), 8, 3);
+ String vChar = String.valueOf(random.nextInt());
+ String vVarChar = String.valueOf(random.nextInt());
+ byte[] vBinary = String.valueOf(random.nextInt()).getBytes();
+ byte[] vVarBinary = String.valueOf(random.nextInt()).getBytes();
+ int vDate = random.nextInt(0, 30000);
+
+ String k =
+ String.format(
+ "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s",
+ pt,
+ vBoolean,
+ vBigInt,
+ vFloat,
+ vDouble,
+ vDecimal,
+ vChar,
+ vVarChar,
+ new String(vBinary),
+ new String(vVarBinary),
+ LocalDate.ofEpochDay(vDate));
+ testRecords.add(
+ new TestRecord(
+ binaryRow.apply(pt),
+ k,
+ "",
+ GenericRow.of(
+ pt,
+ vBoolean,
+ vBigInt,
+ vFloat,
+ vDouble,
+ vDecimal,
+ BinaryString.fromString(vChar),
+ BinaryString.fromString(vVarChar),
+ vBinary,
+ vVarBinary,
+ vDate)));
+ }
+
+ runCompatibilityTest(
+ rowType,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ testRecords,
+ r ->
+ String.format(
+ "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s",
+ r.get(0),
+ r.get(1),
+ r.get(2),
+ r.get(3),
+ r.get(4),
+ r.get(5),
+ r.get(6),
+ r.get(7),
+ new String(r.get(8, ByteBuffer.class).array()),
+ new String(r.get(9, ByteBuffer.class).array()),
+ r.get(10)),
+ r -> "");
+ }
+
+ private void runCompatibilityTest(
+ RowType rowType,
+ List<String> partitionKeys,
+ List<String> primaryKeys,
+ List<TestRecord> testRecords,
+ Function<Record, String> icebergRecordToKey,
+ Function<Record, String> icebergRecordToValue)
+ throws Exception {
+ LocalFileIO fileIO = LocalFileIO.create();
+ Path path = new Path(tempDir.toString());
+
+ Options options = new Options();
+ if (!primaryKeys.isEmpty()) {
+ options.set(CoreOptions.BUCKET, 2);
+ }
+ options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true);
+ options.set(CoreOptions.FILE_FORMAT, "avro");
+ Schema schema =
+ new Schema(rowType.getFields(), partitionKeys, primaryKeys,
options.toMap(), "");
+
+ FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path);
+ paimonCatalog.createDatabase("mydb", false);
+ Identifier paimonIdentifier = Identifier.create("mydb", "t");
+ paimonCatalog.createTable(paimonIdentifier, schema, false);
+ FileStoreTable table = (FileStoreTable)
paimonCatalog.getTable(paimonIdentifier);
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+
+ Map<String, String> expected = new HashMap<>();
+ for (TestRecord testRecord : testRecords) {
+ expected.put(testRecord.key, testRecord.value);
+ write.write(testRecord.record);
+ }
+
+ if (!primaryKeys.isEmpty()) {
+ for (BinaryRow partition :
+ testRecords.stream().map(t ->
t.partition).collect(Collectors.toSet())) {
+ for (int b = 0; b < 2; b++) {
+ write.compact(partition, b, true);
+ }
+ }
+ }
+ commit.commit(1, write.prepareCommit(true, 1));
+ write.close();
+ commit.close();
+
+ HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(),
tempDir.toString());
+ TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t");
+ org.apache.iceberg.Table icebergTable =
icebergCatalog.loadTable(icebergIdentifier);
+ CloseableIterable<Record> result =
IcebergGenerics.read(icebergTable).build();
+ Map<String, String> actual = new HashMap<>();
+ for (Record record : result) {
+ actual.put(icebergRecordToKey.apply(record),
icebergRecordToValue.apply(record));
+ }
+ result.close();
+
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private static class TestRecord {
+ private final BinaryRow partition;
+ private final String key;
+ private final String value;
+ private final GenericRow record;
+
+ private TestRecord(BinaryRow partition, String key, String value,
GenericRow record) {
+ this.partition = partition;
+ this.key = key;
+ this.value = value;
+ this.record = record;
+ }
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
index 29e6eadcf..c390b1cbe 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
@@ -41,7 +41,9 @@ import org.apache.avro.file.DataFileWriter;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
@@ -57,6 +59,9 @@ public class AvroFileFormat extends FileFormat {
.defaultValue(SNAPPY_CODEC)
.withDescription("The compression codec for avro");
+ public static final ConfigOption<Map<String, String>>
AVRO_ROW_NAME_MAPPING =
+ ConfigOptions.key("row-name-mapping").mapType().defaultValue(new
HashMap<>());
+
private final FormatContext context;
public AvroFileFormat(FormatContext context) {
@@ -85,7 +90,7 @@ public class AvroFileFormat extends FileFormat {
public void validateDataFields(RowType rowType) {
List<DataType> fieldTypes = rowType.getFieldTypes();
for (DataType dataType : fieldTypes) {
- AvroSchemaConverter.convertToSchema(dataType);
+ AvroSchemaConverter.convertToSchema(dataType, new HashMap<>());
}
}
@@ -110,7 +115,10 @@ public class AvroFileFormat extends FileFormat {
this.factory =
new AvroWriterFactory<>(
(out, compression) -> {
- Schema schema =
AvroSchemaConverter.convertToSchema(rowType);
+ Schema schema =
+ AvroSchemaConverter.convertToSchema(
+ rowType,
+
context.formatOptions().get(AVRO_ROW_NAME_MAPPING));
AvroRowDatumWriter datumWriter = new
AvroRowDatumWriter(rowType);
DataFileWriter<InternalRow> dataFileWriter =
new DataFileWriter<>(datumWriter);
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
index bfca9006a..5abc98b26 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
@@ -35,6 +35,7 @@ import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import java.util.List;
+import java.util.Map;
/** Converts an Avro schema into Paimon's type information. */
public class AvroSchemaConverter {
@@ -52,8 +53,8 @@ public class AvroSchemaConverter {
* nested type
* @return Avro's {@link Schema} matching this logical type.
*/
- public static Schema convertToSchema(DataType schema) {
- return convertToSchema(schema,
"org.apache.paimon.avro.generated.record");
+ public static Schema convertToSchema(DataType schema, Map<String, String>
rowNameMapping) {
+ return convertToSchema(schema,
"org.apache.paimon.avro.generated.record", rowNameMapping);
}
/**
@@ -66,7 +67,12 @@ public class AvroSchemaConverter {
* @param rowName the record name
* @return Avro's {@link Schema} matching this logical type.
*/
- public static Schema convertToSchema(DataType dataType, String rowName) {
+ public static Schema convertToSchema(
+ DataType dataType, String rowName, Map<String, String>
rowNameMapping) {
+ if (rowNameMapping.containsKey(rowName)) {
+ rowName = rowNameMapping.get(rowName);
+ }
+
int precision;
boolean nullable = dataType.isNullable();
switch (dataType.getTypeRoot()) {
@@ -166,7 +172,11 @@ public class AvroSchemaConverter {
DataType fieldType = rowType.getTypeAt(i);
SchemaBuilder.GenericDefault<Schema> fieldBuilder =
builder.name(fieldName)
- .type(convertToSchema(fieldType, rowName +
"_" + fieldName));
+ .type(
+ convertToSchema(
+ fieldType,
+ rowName + "_" + fieldName,
+ rowNameMapping));
if (fieldType.isNullable()) {
builder = fieldBuilder.withDefault(null);
@@ -183,14 +193,20 @@ public class AvroSchemaConverter {
.map()
.values(
convertToSchema(
-
extractValueTypeToAvroMap(dataType), rowName));
+
extractValueTypeToAvroMap(dataType),
+ rowName,
+ rowNameMapping));
return nullable ? nullableSchema(map) : map;
case ARRAY:
ArrayType arrayType = (ArrayType) dataType;
Schema array =
SchemaBuilder.builder()
.array()
-
.items(convertToSchema(arrayType.getElementType(), rowName));
+ .items(
+ convertToSchema(
+ arrayType.getElementType(),
+ rowName,
+ rowNameMapping));
return nullable ? nullableSchema(array) : array;
default:
throw new UnsupportedOperationException(