This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new deeed7efab [core] Preserving old Iceberg Metadata files (#5228)
deeed7efab is described below
commit deeed7efabd9c08c23c243de99f2b49d2791f585
Author: junmuz <[email protected]>
AuthorDate: Thu Mar 13 09:49:04 2025 +0000
[core] Preserving old Iceberg Metadata files (#5228)
---
.../generated/iceberg_configuration.html | 16 ++++++++++++++--
.../paimon/iceberg/IcebergCommitCallback.java | 22 ++++++++++++++++++++--
.../org/apache/paimon/iceberg/IcebergOptions.java | 20 +++++++++++++++++---
.../paimon/iceberg/IcebergCompatibilityTest.java | 12 ++++++++++--
4 files changed, 61 insertions(+), 9 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/iceberg_configuration.html
b/docs/layouts/shortcodes/generated/iceberg_configuration.html
index a71cabb679..caa243dc42 100644
--- a/docs/layouts/shortcodes/generated/iceberg_configuration.html
+++ b/docs/layouts/shortcodes/generated/iceberg_configuration.html
@@ -30,13 +30,13 @@ under the License.
<td><h5>metadata.iceberg.compaction.max.file-num</h5></td>
<td style="word-wrap: break-word;">50</td>
<td>Integer</td>
- <td>If number of small Iceberg metadata files exceeds this limit,
always trigger metadata compaction regardless of their total size.</td>
+ <td>If number of small Iceberg manifest metadata files exceeds
this limit, always trigger manifest metadata compaction regardless of their
total size.</td>
</tr>
<tr>
<td><h5>metadata.iceberg.compaction.min.file-num</h5></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
- <td>Minimum number of Iceberg metadata files to trigger metadata
compaction.</td>
+ <td>Minimum number of Iceberg manifest metadata files to trigger
manifest metadata compaction.</td>
</tr>
<tr>
<td><h5>metadata.iceberg.database</h5></td>
@@ -44,6 +44,12 @@ under the License.
<td>String</td>
<td>Metastore database name for Iceberg Catalog. Set this as an
iceberg database alias if using a centralized Catalog.</td>
</tr>
+ <tr>
+ <td><h5>metadata.iceberg.delete-after-commit.enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Whether to delete old metadata files after each table
commit</td>
+ </tr>
<tr>
<td><h5>metadata.iceberg.glue.skip-archive</h5></td>
<td style="word-wrap: break-word;">false</td>
@@ -80,6 +86,12 @@ under the License.
<td>Boolean</td>
<td>Should use the legacy manifest version to generate Iceberg's
1.4 manifest files.</td>
</tr>
+ <tr>
+ <td><h5>metadata.iceberg.previous-versions-max</h5></td>
+ <td style="word-wrap: break-word;">0</td>
+ <td>Integer</td>
+ <td>The number of old metadata files to keep after each table
commit</td>
+ </tr>
<tr>
<td><h5>metadata.iceberg.storage</h5></td>
<td style="word-wrap: break-word;">disabled</td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index e1b205d9b0..c20d07b9a4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -424,7 +424,7 @@ public class IcebergCommitCallback implements
CommitCallback {
new Path(pathFactory.metadataDirectory(),
VERSION_HINT_FILENAME),
String.valueOf(snapshotId));
- table.fileIO().deleteQuietly(baseMetadataPath);
+ deleteApplicableMetadataFiles(snapshotId);
for (int i = 0; i + 1 < toExpireExceptLast.size(); i++) {
expireManifestList(
new
Path(toExpireExceptLast.get(i).manifestList()).getName(),
@@ -752,7 +752,25 @@ public class IcebergCommitCallback implements
CommitCallback {
}
table.fileIO().deleteQuietly(listPath);
}
- table.fileIO().deleteQuietly(path);
+ deleteApplicableMetadataFiles(snapshotId);
+ }
+ }
+
+ private void deleteApplicableMetadataFiles(long snapshotId) throws
IOException {
+ Options options = new Options(table.options());
+ if (options.get(IcebergOptions.METADATA_DELETE_AFTER_COMMIT)) {
+ long earliestMetadataId =
+ snapshotId -
options.get(IcebergOptions.METADATA_PREVIOUS_VERSIONS_MAX);
+ if (earliestMetadataId > 0) {
+ Iterator<Path> it =
+ pathFactory
+ .getAllMetadataPathBefore(table.fileIO(),
earliestMetadataId)
+ .iterator();
+ while (it.hasNext()) {
+ Path path = it.next();
+ table.fileIO().deleteQuietly(path);
+ }
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
index b538a7a606..6835440d84 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
@@ -42,15 +42,29 @@ public class IcebergOptions {
.intType()
.defaultValue(10)
.withDescription(
- "Minimum number of Iceberg metadata files to
trigger metadata compaction.");
+ "Minimum number of Iceberg manifest metadata files
to trigger manifest metadata compaction.");
public static final ConfigOption<Integer> COMPACT_MAX_FILE_NUM =
ConfigOptions.key("metadata.iceberg.compaction.max.file-num")
.intType()
.defaultValue(50)
.withDescription(
- "If number of small Iceberg metadata files exceeds
this limit, "
- + "always trigger metadata compaction
regardless of their total size.");
+ "If number of small Iceberg manifest metadata
files exceeds this limit, "
+ + "always trigger manifest metadata
compaction regardless of their total size.");
+
+ public static final ConfigOption<Boolean> METADATA_DELETE_AFTER_COMMIT =
+ key("metadata.iceberg.delete-after-commit.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to delete old metadata files after each
table commit");
+
+ public static final ConfigOption<Integer> METADATA_PREVIOUS_VERSIONS_MAX =
+ key("metadata.iceberg.previous-versions-max")
+ .intType()
+ .defaultValue(0)
+ .withDescription(
+ "The number of old metadata files to keep after
each table commit");
public static final ConfigOption<String> URI =
key("metadata.iceberg.uri")
diff --git
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 8cd37bdd1e..679529b9b4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -440,8 +440,9 @@ public class IcebergCompatibilityTest {
write.close();
commit.close();
- // The old metadata.json is removed when the new metadata.json is
created.
- for (int i = 1; i <= 4; i++) {
+ // The old metadata.json is removed when the new metadata.json is
created
+ // depending on the old metadata retention configuration.
+ for (int i = 1; i <= 3; i++) {
unusedFiles.add(pathFactory.toMetadataPath(i).toString());
}
@@ -449,6 +450,11 @@ public class IcebergCompatibilityTest {
assertThat(fileIO.exists(new Path(path))).isFalse();
}
+ // Check existence of retained Iceberg metadata.json files
+ for (int i = 4; i <= 5; i++) {
+ assertThat(fileIO.exists(new
Path(pathFactory.toMetadataPath(i).toString()))).isTrue();
+ }
+
// Test all existing Iceberg snapshots are valid.
assertThat(getIcebergResult())
.containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 21)",
"Record(3, 31)");
@@ -961,6 +967,8 @@ public class IcebergCompatibilityTest {
options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofKibiBytes(32));
options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 4);
options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 8);
+ options.set(IcebergOptions.METADATA_DELETE_AFTER_COMMIT, true);
+ options.set(IcebergOptions.METADATA_PREVIOUS_VERSIONS_MAX, 1);
options.set(CoreOptions.MANIFEST_TARGET_FILE_SIZE,
MemorySize.ofKibiBytes(8));
Schema schema =
new Schema(rowType.getFields(), partitionKeys, primaryKeys,
options.toMap(), "");