This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c24561584 [core][iceberg] fix FileNotFoundException when deleting old 
metadatas (#5799)
2c24561584 is described below

commit 2c245615847bc5d2e850679eb3f7c640e6781378
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Jun 26 22:15:58 2025 +0800

    [core][iceberg] fix FileNotFoundException when deleting old metadatas 
(#5799)
---
 .../paimon/iceberg/IcebergCommitCallback.java      |   2 +-
 .../paimon/iceberg/IcebergCompatibilityTest.java   | 120 ++++++++++++---------
 2 files changed, 71 insertions(+), 51 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index 9e11b8f2ee..a53f132ead 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -942,8 +942,8 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                 }
                 table.fileIO().deleteQuietly(listPath);
             }
-            deleteApplicableMetadataFiles(snapshotId);
         }
+        deleteApplicableMetadataFiles(snapshotId);
     }
 
     private void deleteApplicableMetadataFiles(long snapshotId) throws 
IOException {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 668894f23d..5cb45cbc6f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -25,12 +25,10 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.DataFormatTestUtil;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericMap;
 import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.fs.FileIO;
@@ -43,7 +41,6 @@ import org.apache.paimon.iceberg.metadata.IcebergMetadata;
 import org.apache.paimon.iceberg.metadata.IcebergRef;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -51,8 +48,6 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.TableWriteImpl;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeRoot;
@@ -72,8 +67,6 @@ import org.apache.iceberg.data.Record;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.StructLikeSet;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -771,6 +764,76 @@ public class IcebergCompatibilityTest {
         }
     }
 
+    @Test
+    public void testWithIncorrectBase() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"k", "v"});
+        FileStoreTable table =
+                createPaimonTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        1,
+                        Collections.emptyMap());
+
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(1, 10));
+        write.write(GenericRow.of(2, 20));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        write.write(GenericRow.of(1, 11));
+        write.write(GenericRow.of(3, 30));
+        write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit.commit(2, write.prepareCommit(true, 2));
+        assertThat(getIcebergResult())
+                .containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 20)", 
"Record(3, 30)");
+
+        // disable iceberg compatibility
+        Map<String, String> options = new HashMap<>();
+        options.put(IcebergOptions.METADATA_ICEBERG_STORAGE.key(), "disabled");
+        table = table.copy(options);
+        write.close();
+        write = table.newWrite(commitUser);
+        commit.close();
+        commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(4, 40));
+        write.write(GenericRow.of(5, 50));
+        write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit.commit(3, write.prepareCommit(true, 3));
+        assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(5L);
+
+        // enable iceberg compatibility
+        options.put(
+                IcebergOptions.METADATA_ICEBERG_STORAGE.key(),
+                IcebergOptions.StorageType.TABLE_LOCATION.toString());
+        table = table.copy(options);
+        write.close();
+        write = table.newWrite(commitUser);
+        commit.close();
+        commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(6, 60));
+        write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit.commit(4, write.prepareCommit(true, 4));
+        assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(7L);
+        assertThat(getIcebergResult())
+                .containsExactlyInAnyOrder(
+                        "Record(1, 11)",
+                        "Record(2, 20)",
+                        "Record(3, 30)",
+                        "Record(4, 40)",
+                        "Record(5, 50)",
+                        "Record(6, 60)");
+
+        write.close();
+        commit.close();
+    }
+
     /*
     Create snapshots
     Create tags
@@ -1166,47 +1229,4 @@ public class IcebergCompatibilityTest {
         result.close();
         return actual;
     }
-
-    private void validateIcebergResult(List<Object[]> expected) throws 
Exception {
-        HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(), 
tempDir.toString());
-        TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t");
-        org.apache.iceberg.Table icebergTable = 
icebergCatalog.loadTable(icebergIdentifier);
-
-        Types.StructType type = icebergTable.schema().asStruct();
-
-        StructLikeSet actualSet = StructLikeSet.create(type);
-        StructLikeSet expectSet = StructLikeSet.create(type);
-
-        try (CloseableIterable<Record> reader = 
IcebergGenerics.read(icebergTable).build()) {
-            reader.forEach(actualSet::add);
-        }
-        expectSet.addAll(
-                expected.stream().map(r -> icebergRecord(type, 
r)).collect(Collectors.toList()));
-
-        assertThat(actualSet).isEqualTo(expectSet);
-    }
-
-    private org.apache.iceberg.data.GenericRecord icebergRecord(
-            Types.StructType type, Object[] row) {
-        org.apache.iceberg.data.GenericRecord record =
-                org.apache.iceberg.data.GenericRecord.create(type);
-        for (int i = 0; i < row.length; i++) {
-            record.set(i, row[i]);
-        }
-        return record;
-    }
-
-    private List<String> getPaimonResult(FileStoreTable paimonTable) throws 
Exception {
-        List<Split> splits = 
paimonTable.newReadBuilder().newScan().plan().splits();
-        TableRead read = paimonTable.newReadBuilder().newRead();
-        try (RecordReader<InternalRow> recordReader = 
read.createReader(splits)) {
-            List<String> result = new ArrayList<>();
-            recordReader.forEachRemaining(
-                    row ->
-                            result.add(
-                                    DataFormatTestUtil.toStringWithRowKind(
-                                            row, paimonTable.rowType())));
-            return result;
-        }
-    }
 }

Reply via email to