This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 512e2ce05c [iceberg] Introduce
metadata.iceberg.manifest-legacy-version (#4621)
512e2ce05c is described below
commit 512e2ce05c15c388f81bf2b645546c9a29071a7b
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 2 19:06:20 2024 +0800
[iceberg] Introduce metadata.iceberg.manifest-legacy-version (#4621)
---
docs/content/migration/iceberg-compatibility.md | 11 ++++++++
.../org/apache/paimon/iceberg/IcebergOptions.java | 7 ++++++
.../iceberg/manifest/IcebergManifestFileMeta.java | 29 +++++++++++++++++++++-
.../IcebergManifestFileMetaSerializer.java | 5 ++--
.../iceberg/manifest/IcebergManifestList.java | 24 ++++++++++--------
.../paimon/iceberg/IcebergCompatibilityTest.java | 15 +++++++++--
6 files changed, 75 insertions(+), 16 deletions(-)
diff --git a/docs/content/migration/iceberg-compatibility.md
b/docs/content/migration/iceberg-compatibility.md
index 7b83936b53..d745607148 100644
--- a/docs/content/migration/iceberg-compatibility.md
+++ b/docs/content/migration/iceberg-compatibility.md
@@ -377,9 +377,20 @@ you also need to set some (or all) of the following table
options when creating
<td>String</td>
<td>Compression for Iceberg manifest files.</td>
</tr>
+ <tr>
+ <td><h5>metadata.iceberg.manifest-legacy-version</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Should use the legacy manifest version to generate Iceberg's 1.4
manifest files.</td>
+ </tr>
</tbody>
</table>
+## AWS Athena
+
+AWS Athena may use old manifest reader to read Iceberg manifest by names, we
should let Paimon producing legacy Iceberg
+manifest list file, you can enable:
`'metadata.iceberg.manifest-legacy-version'`.
+
## Trino Iceberg
In this example, we use Trino Iceberg connector to access Paimon table through
Iceberg Hive catalog.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
index 3900233d21..c0ceed97ba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
@@ -77,6 +77,13 @@ public class IcebergOptions {
"gzip") // some Iceberg reader cannot support
zstd, for example DuckDB
.withDescription("Compression for Iceberg manifest
files.");
+ public static final ConfigOption<Boolean> MANIFEST_LEGACY_VERSION =
+ key("metadata.iceberg.manifest-legacy-version")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Should use the legacy manifest version to
generate Iceberg's 1.4 manifest files.");
+
/** Where to store Iceberg metadata. */
public enum StorageType implements DescribedEnum {
DISABLED("disabled", "Disable Iceberg compatibility support."),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java
index fd9c2daf7e..c5fcb6005f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java
@@ -165,7 +165,11 @@ public class IcebergManifestFileMeta {
return partitions;
}
- public static RowType schema() {
+ public static RowType schema(boolean legacyVersion) {
+ return legacyVersion ? schemaForIceberg1_4() : schemaForIcebergNew();
+ }
+
+ private static RowType schemaForIcebergNew() {
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(500, "manifest_path",
DataTypes.STRING().notNull()));
fields.add(new DataField(501, "manifest_length",
DataTypes.BIGINT().notNull()));
@@ -186,6 +190,29 @@ public class IcebergManifestFileMeta {
return new RowType(false, fields);
}
+ private static RowType schemaForIceberg1_4() {
+ // see https://github.com/apache/iceberg/pull/5338
+ // some reader still want old schema, for example, AWS athena
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(500, "manifest_path",
DataTypes.STRING().notNull()));
+ fields.add(new DataField(501, "manifest_length",
DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(502, "partition_spec_id",
DataTypes.INT().notNull()));
+ fields.add(new DataField(517, "content", DataTypes.INT().notNull()));
+ fields.add(new DataField(515, "sequence_number",
DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(516, "min_sequence_number",
DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(503, "added_snapshot_id",
DataTypes.BIGINT()));
+ fields.add(new DataField(504, "added_data_files_count",
DataTypes.INT().notNull()));
+ fields.add(new DataField(505, "existing_data_files_count",
DataTypes.INT().notNull()));
+ fields.add(new DataField(506, "deleted_data_files_count",
DataTypes.INT().notNull()));
+ fields.add(new DataField(512, "added_rows_count",
DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(513, "existing_rows_count",
DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(514, "deleted_rows_count",
DataTypes.BIGINT().notNull()));
+ fields.add(
+ new DataField(
+ 508, "partitions",
DataTypes.ARRAY(IcebergPartitionSummary.schema())));
+ return new RowType(false, fields);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java
index c40a26e8fd..2b4c9b771c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta.Content;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ObjectSerializer;
import java.util.ArrayList;
@@ -36,8 +37,8 @@ public class IcebergManifestFileMetaSerializer extends
ObjectSerializer<IcebergM
private final IcebergPartitionSummarySerializer partitionSummarySerializer;
- public IcebergManifestFileMetaSerializer() {
- super(IcebergManifestFileMeta.schema());
+ public IcebergManifestFileMetaSerializer(RowType schema) {
+ super(schema);
this.partitionSummarySerializer = new
IcebergPartitionSummarySerializer();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
index ab5cc926cd..ef78969a24 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
@@ -20,13 +20,12 @@ package org.apache.paimon.iceberg.manifest;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.format.FormatReaderFactory;
-import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.iceberg.IcebergOptions;
import org.apache.paimon.iceberg.IcebergPathFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ObjectsFile;
import org.apache.paimon.utils.PathFactory;
@@ -38,16 +37,16 @@ public class IcebergManifestList extends
ObjectsFile<IcebergManifestFileMeta> {
public IcebergManifestList(
FileIO fileIO,
- FormatReaderFactory readerFactory,
- FormatWriterFactory writerFactory,
+ FileFormat fileFormat,
+ RowType manifestType,
String compression,
PathFactory pathFactory) {
super(
fileIO,
- new IcebergManifestFileMetaSerializer(),
- IcebergManifestFileMeta.schema(),
- readerFactory,
- writerFactory,
+ new IcebergManifestFileMetaSerializer(manifestType),
+ manifestType,
+ fileFormat.createReaderFactory(manifestType),
+ fileFormat.createWriterFactory(manifestType),
compression,
pathFactory,
null);
@@ -65,11 +64,14 @@ public class IcebergManifestList extends
ObjectsFile<IcebergManifestFileMeta> {
"avro.row-name-mapping",
"org.apache.paimon.avro.generated.record:manifest_file,"
+ "manifest_file_partitions:r508");
- FileFormat manifestListAvro = FileFormat.fromIdentifier("avro",
avroOptions);
+ FileFormat fileFormat = FileFormat.fromIdentifier("avro", avroOptions);
+ RowType manifestType =
+ IcebergManifestFileMeta.schema(
+
avroOptions.get(IcebergOptions.MANIFEST_LEGACY_VERSION));
return new IcebergManifestList(
table.fileIO(),
-
manifestListAvro.createReaderFactory(IcebergManifestFileMeta.schema()),
-
manifestListAvro.createWriterFactory(IcebergManifestFileMeta.schema()),
+ fileFormat,
+ manifestType,
avroOptions.get(IcebergOptions.MANIFEST_COMPRESSION),
pathFactory.manifestListFactory());
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 45cfe109b9..b069ac031d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -78,6 +78,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for Iceberg compatibility. */
public class IcebergCompatibilityTest {
@@ -309,11 +310,21 @@ public class IcebergCompatibilityTest {
assertThat(manifestFile.compression()).isEqualTo("gzip");
Set<String> usingManifests = new HashSet<>();
- for (IcebergManifestFileMeta fileMeta :
- manifestList.read(new
Path(metadata.currentSnapshot().manifestList()).getName())) {
+ String manifestListFile = new
Path(metadata.currentSnapshot().manifestList()).getName();
+ for (IcebergManifestFileMeta fileMeta :
manifestList.read(manifestListFile)) {
usingManifests.add(fileMeta.manifestPath());
}
+ IcebergManifestList legacyManifestList =
+ IcebergManifestList.create(
+ table.copy(
+ Collections.singletonMap(
+
IcebergOptions.MANIFEST_LEGACY_VERSION.key(), "true")),
+ pathFactory);
+ assertThatThrownBy(() -> legacyManifestList.read(manifestListFile))
+ .rootCause()
+ .isInstanceOf(NullPointerException.class);
+
Set<String> unusedFiles = new HashSet<>();
for (int i = 0; i < 2; i++) {
unusedFiles.add(metadata.snapshots().get(i).manifestList());