This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 512e2ce05c [iceberg] Introduce 
metadata.iceberg.manifest-legacy-version (#4621)
512e2ce05c is described below

commit 512e2ce05c15c388f81bf2b645546c9a29071a7b
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 2 19:06:20 2024 +0800

    [iceberg] Introduce metadata.iceberg.manifest-legacy-version (#4621)
---
 docs/content/migration/iceberg-compatibility.md    | 11 ++++++++
 .../org/apache/paimon/iceberg/IcebergOptions.java  |  7 ++++++
 .../iceberg/manifest/IcebergManifestFileMeta.java  | 29 +++++++++++++++++++++-
 .../IcebergManifestFileMetaSerializer.java         |  5 ++--
 .../iceberg/manifest/IcebergManifestList.java      | 24 ++++++++++--------
 .../paimon/iceberg/IcebergCompatibilityTest.java   | 15 +++++++++--
 6 files changed, 75 insertions(+), 16 deletions(-)

diff --git a/docs/content/migration/iceberg-compatibility.md 
b/docs/content/migration/iceberg-compatibility.md
index 7b83936b53..d745607148 100644
--- a/docs/content/migration/iceberg-compatibility.md
+++ b/docs/content/migration/iceberg-compatibility.md
@@ -377,9 +377,20 @@ you also need to set some (or all) of the following table 
options when creating
       <td>String</td>
       <td>Compression for Iceberg manifest files.</td>
     </tr>
+    <tr>
+      <td><h5>metadata.iceberg.manifest-legacy-version</h5></td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Should use the legacy manifest version to generate Iceberg's 1.4 
manifest files.</td>
+    </tr>
     </tbody>
 </table>
 
+## AWS Athena
+
+AWS Athena may use old manifest reader to read Iceberg manifest by names, we 
should let Paimon producing legacy Iceberg
+manifest list file, you can enable: 
`'metadata.iceberg.manifest-legacy-version'`.
+
 ## Trino Iceberg
 
 In this example, we use Trino Iceberg connector to access Paimon table through 
Iceberg Hive catalog.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
index 3900233d21..c0ceed97ba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
@@ -77,6 +77,13 @@ public class IcebergOptions {
                             "gzip") // some Iceberg reader cannot support 
zstd, for example DuckDB
                     .withDescription("Compression for Iceberg manifest 
files.");
 
+    public static final ConfigOption<Boolean> MANIFEST_LEGACY_VERSION =
+            key("metadata.iceberg.manifest-legacy-version")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Should use the legacy manifest version to 
generate Iceberg's 1.4 manifest files.");
+
     /** Where to store Iceberg metadata. */
     public enum StorageType implements DescribedEnum {
         DISABLED("disabled", "Disable Iceberg compatibility support."),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java
index fd9c2daf7e..c5fcb6005f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java
@@ -165,7 +165,11 @@ public class IcebergManifestFileMeta {
         return partitions;
     }
 
-    public static RowType schema() {
+    public static RowType schema(boolean legacyVersion) {
+        return legacyVersion ? schemaForIceberg1_4() : schemaForIcebergNew();
+    }
+
+    private static RowType schemaForIcebergNew() {
         List<DataField> fields = new ArrayList<>();
         fields.add(new DataField(500, "manifest_path", 
DataTypes.STRING().notNull()));
         fields.add(new DataField(501, "manifest_length", 
DataTypes.BIGINT().notNull()));
@@ -186,6 +190,29 @@ public class IcebergManifestFileMeta {
         return new RowType(false, fields);
     }
 
+    private static RowType schemaForIceberg1_4() {
+        // see https://github.com/apache/iceberg/pull/5338
+        // some reader still want old schema, for example, AWS athena
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(500, "manifest_path", 
DataTypes.STRING().notNull()));
+        fields.add(new DataField(501, "manifest_length", 
DataTypes.BIGINT().notNull()));
+        fields.add(new DataField(502, "partition_spec_id", 
DataTypes.INT().notNull()));
+        fields.add(new DataField(517, "content", DataTypes.INT().notNull()));
+        fields.add(new DataField(515, "sequence_number", 
DataTypes.BIGINT().notNull()));
+        fields.add(new DataField(516, "min_sequence_number", 
DataTypes.BIGINT().notNull()));
+        fields.add(new DataField(503, "added_snapshot_id", 
DataTypes.BIGINT()));
+        fields.add(new DataField(504, "added_data_files_count", 
DataTypes.INT().notNull()));
+        fields.add(new DataField(505, "existing_data_files_count", 
DataTypes.INT().notNull()));
+        fields.add(new DataField(506, "deleted_data_files_count", 
DataTypes.INT().notNull()));
+        fields.add(new DataField(512, "added_rows_count", 
DataTypes.BIGINT().notNull()));
+        fields.add(new DataField(513, "existing_rows_count", 
DataTypes.BIGINT().notNull()));
+        fields.add(new DataField(514, "deleted_rows_count", 
DataTypes.BIGINT().notNull()));
+        fields.add(
+                new DataField(
+                        508, "partitions", 
DataTypes.ARRAY(IcebergPartitionSummary.schema())));
+        return new RowType(false, fields);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java
index c40a26e8fd..2b4c9b771c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta.Content;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ObjectSerializer;
 
 import java.util.ArrayList;
@@ -36,8 +37,8 @@ public class IcebergManifestFileMetaSerializer extends 
ObjectSerializer<IcebergM
 
     private final IcebergPartitionSummarySerializer partitionSummarySerializer;
 
-    public IcebergManifestFileMetaSerializer() {
-        super(IcebergManifestFileMeta.schema());
+    public IcebergManifestFileMetaSerializer(RowType schema) {
+        super(schema);
         this.partitionSummarySerializer = new 
IcebergPartitionSummarySerializer();
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
index ab5cc926cd..ef78969a24 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
@@ -20,13 +20,12 @@ package org.apache.paimon.iceberg.manifest;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.format.FormatReaderFactory;
-import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.iceberg.IcebergOptions;
 import org.apache.paimon.iceberg.IcebergPathFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ObjectsFile;
 import org.apache.paimon.utils.PathFactory;
 
@@ -38,16 +37,16 @@ public class IcebergManifestList extends 
ObjectsFile<IcebergManifestFileMeta> {
 
     public IcebergManifestList(
             FileIO fileIO,
-            FormatReaderFactory readerFactory,
-            FormatWriterFactory writerFactory,
+            FileFormat fileFormat,
+            RowType manifestType,
             String compression,
             PathFactory pathFactory) {
         super(
                 fileIO,
-                new IcebergManifestFileMetaSerializer(),
-                IcebergManifestFileMeta.schema(),
-                readerFactory,
-                writerFactory,
+                new IcebergManifestFileMetaSerializer(manifestType),
+                manifestType,
+                fileFormat.createReaderFactory(manifestType),
+                fileFormat.createWriterFactory(manifestType),
                 compression,
                 pathFactory,
                 null);
@@ -65,11 +64,14 @@ public class IcebergManifestList extends 
ObjectsFile<IcebergManifestFileMeta> {
                 "avro.row-name-mapping",
                 "org.apache.paimon.avro.generated.record:manifest_file,"
                         + "manifest_file_partitions:r508");
-        FileFormat manifestListAvro = FileFormat.fromIdentifier("avro", 
avroOptions);
+        FileFormat fileFormat = FileFormat.fromIdentifier("avro", avroOptions);
+        RowType manifestType =
+                IcebergManifestFileMeta.schema(
+                        
avroOptions.get(IcebergOptions.MANIFEST_LEGACY_VERSION));
         return new IcebergManifestList(
                 table.fileIO(),
-                
manifestListAvro.createReaderFactory(IcebergManifestFileMeta.schema()),
-                
manifestListAvro.createWriterFactory(IcebergManifestFileMeta.schema()),
+                fileFormat,
+                manifestType,
                 avroOptions.get(IcebergOptions.MANIFEST_COMPRESSION),
                 pathFactory.manifestListFactory());
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 45cfe109b9..b069ac031d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -78,6 +78,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for Iceberg compatibility. */
 public class IcebergCompatibilityTest {
@@ -309,11 +310,21 @@ public class IcebergCompatibilityTest {
         assertThat(manifestFile.compression()).isEqualTo("gzip");
 
         Set<String> usingManifests = new HashSet<>();
-        for (IcebergManifestFileMeta fileMeta :
-                manifestList.read(new 
Path(metadata.currentSnapshot().manifestList()).getName())) {
+        String manifestListFile = new 
Path(metadata.currentSnapshot().manifestList()).getName();
+        for (IcebergManifestFileMeta fileMeta : 
manifestList.read(manifestListFile)) {
             usingManifests.add(fileMeta.manifestPath());
         }
 
+        IcebergManifestList legacyManifestList =
+                IcebergManifestList.create(
+                        table.copy(
+                                Collections.singletonMap(
+                                        
IcebergOptions.MANIFEST_LEGACY_VERSION.key(), "true")),
+                        pathFactory);
+        assertThatThrownBy(() -> legacyManifestList.read(manifestListFile))
+                .rootCause()
+                .isInstanceOf(NullPointerException.class);
+
         Set<String> unusedFiles = new HashSet<>();
         for (int i = 0; i < 2; i++) {
             unusedFiles.add(metadata.snapshots().get(i).manifestList());

Reply via email to