This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2c24561584 [core][iceberg] fix FileNotFoundException when deleting old
metadatas (#5799)
2c24561584 is described below
commit 2c245615847bc5d2e850679eb3f7c640e6781378
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Jun 26 22:15:58 2025 +0800
[core][iceberg] fix FileNotFoundException when deleting old metadatas
(#5799)
---
.../paimon/iceberg/IcebergCommitCallback.java | 2 +-
.../paimon/iceberg/IcebergCompatibilityTest.java | 120 ++++++++++++---------
2 files changed, 71 insertions(+), 51 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index 9e11b8f2ee..a53f132ead 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -942,8 +942,8 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
}
table.fileIO().deleteQuietly(listPath);
}
- deleteApplicableMetadataFiles(snapshotId);
}
+ deleteApplicableMetadataFiles(snapshotId);
}
private void deleteApplicableMetadataFiles(long snapshotId) throws
IOException {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 668894f23d..5cb45cbc6f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -25,12 +25,10 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.DataFormatTestUtil;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.FileIO;
@@ -43,7 +41,6 @@ import org.apache.paimon.iceberg.metadata.IcebergMetadata;
import org.apache.paimon.iceberg.metadata.IcebergRef;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
-import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -51,8 +48,6 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
@@ -72,8 +67,6 @@ import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.StructLikeSet;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -771,6 +764,76 @@ public class IcebergCompatibilityTest {
}
}
+ @Test
+ public void testWithIncorrectBase() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"k", "v"});
+ FileStoreTable table =
+ createPaimonTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ 1,
+ Collections.emptyMap());
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(1, 10));
+ write.write(GenericRow.of(2, 20));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ write.write(GenericRow.of(1, 11));
+ write.write(GenericRow.of(3, 30));
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit.commit(2, write.prepareCommit(true, 2));
+ assertThat(getIcebergResult())
+ .containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 20)",
"Record(3, 30)");
+
+ // disable iceberg compatibility
+ Map<String, String> options = new HashMap<>();
+ options.put(IcebergOptions.METADATA_ICEBERG_STORAGE.key(), "disabled");
+ table = table.copy(options);
+ write.close();
+ write = table.newWrite(commitUser);
+ commit.close();
+ commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(4, 40));
+ write.write(GenericRow.of(5, 50));
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit.commit(3, write.prepareCommit(true, 3));
+ assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(5L);
+
+ // enable iceberg compatibility
+ options.put(
+ IcebergOptions.METADATA_ICEBERG_STORAGE.key(),
+ IcebergOptions.StorageType.TABLE_LOCATION.toString());
+ table = table.copy(options);
+ write.close();
+ write = table.newWrite(commitUser);
+ commit.close();
+ commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(6, 60));
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit.commit(4, write.prepareCommit(true, 4));
+ assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(7L);
+ assertThat(getIcebergResult())
+ .containsExactlyInAnyOrder(
+ "Record(1, 11)",
+ "Record(2, 20)",
+ "Record(3, 30)",
+ "Record(4, 40)",
+ "Record(5, 50)",
+ "Record(6, 60)");
+
+ write.close();
+ commit.close();
+ }
+
/*
Create snapshots
Create tags
@@ -1166,47 +1229,4 @@ public class IcebergCompatibilityTest {
result.close();
return actual;
}
-
- private void validateIcebergResult(List<Object[]> expected) throws
Exception {
- HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(),
tempDir.toString());
- TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t");
- org.apache.iceberg.Table icebergTable =
icebergCatalog.loadTable(icebergIdentifier);
-
- Types.StructType type = icebergTable.schema().asStruct();
-
- StructLikeSet actualSet = StructLikeSet.create(type);
- StructLikeSet expectSet = StructLikeSet.create(type);
-
- try (CloseableIterable<Record> reader =
IcebergGenerics.read(icebergTable).build()) {
- reader.forEach(actualSet::add);
- }
- expectSet.addAll(
- expected.stream().map(r -> icebergRecord(type,
r)).collect(Collectors.toList()));
-
- assertThat(actualSet).isEqualTo(expectSet);
- }
-
- private org.apache.iceberg.data.GenericRecord icebergRecord(
- Types.StructType type, Object[] row) {
- org.apache.iceberg.data.GenericRecord record =
- org.apache.iceberg.data.GenericRecord.create(type);
- for (int i = 0; i < row.length; i++) {
- record.set(i, row[i]);
- }
- return record;
- }
-
- private List<String> getPaimonResult(FileStoreTable paimonTable) throws
Exception {
- List<Split> splits =
paimonTable.newReadBuilder().newScan().plan().splits();
- TableRead read = paimonTable.newReadBuilder().newRead();
- try (RecordReader<InternalRow> recordReader =
read.createReader(splits)) {
- List<String> result = new ArrayList<>();
- recordReader.forEachRemaining(
- row ->
- result.add(
- DataFormatTestUtil.toStringWithRowKind(
- row, paimonTable.rowType())));
- return result;
- }
- }
}