This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/release-1.1 by this push:
     new 5216ad5ab6 [core] Forcing rewrite when upgrading level0 in different 
formats for lookup (#5598)
5216ad5ab6 is described below

commit 5216ad5ab6d318477f29a7a42a2bf8da636db645
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue May 13 17:48:34 2025 +0800

    [core] Forcing rewrite when upgrading level0 in different formats for 
lookup (#5598)
---
 .../compact/LookupMergeTreeCompactRewriter.java    | 11 ++++++++++
 .../paimon/table/PrimaryKeySimpleTableTest.java    | 24 +++++++++++-----------
 2 files changed, 23 insertions(+), 12 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index 769eec356d..feb6d0b10b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -40,6 +40,8 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
+import java.util.function.IntFunction;
 
 import static 
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE;
 import static 
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_WITH_REWRITE;
@@ -55,6 +57,7 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
     private final MergeFunctionWrapperFactory<T> wrapperFactory;
     private final boolean noSequenceField;
     @Nullable private final DeletionVectorsMaintainer dvMaintainer;
+    private final IntFunction<String> level2FileFormat;
 
     public LookupMergeTreeCompactRewriter(
             int maxLevel,
@@ -85,6 +88,9 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
         this.lookupLevels = lookupLevels;
         this.wrapperFactory = wrapperFactory;
         this.noSequenceField = options.sequenceField().isEmpty();
+        String fileFormat = options.fileFormatString();
+        Map<Integer, String> fileFormatPerLevel = options.fileFormatPerLevel();
+        this.level2FileFormat = level -> 
fileFormatPerLevel.getOrDefault(level, fileFormat);
     }
 
     @Override
@@ -106,6 +112,11 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
             return NO_CHANGELOG_NO_REWRITE;
         }
 
+        // forcing rewriting when upgrading from level 0 to level x with 
different file formats
+        if 
(!level2FileFormat.apply(file.level()).equals(level2FileFormat.apply(outputLevel)))
 {
+            return CHANGELOG_WITH_REWRITE;
+        }
+
         // In deletionVector mode, since drop delete is required, when delete 
row count > 0 rewrite
         // is required.
         if (dvMaintainer != null && file.deleteRowCount().map(cnt -> cnt > 
0).orElse(true)) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 59dee1092a..76dac80214 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -99,6 +99,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.singletonMap;
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.CHANGELOG_FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.CHANGELOG_FILE_STATS_MODE;
@@ -109,6 +110,7 @@ import static 
org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
 import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT_PER_LEVEL;
 import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE;
@@ -163,9 +165,7 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
                 createFileStoreTable(
                         options -> {
                             options.set(FILE_FORMAT, format);
-                            options.set(
-                                    METADATA_STATS_MODE_PER_LEVEL,
-                                    Collections.singletonMap("0", "none"));
+                            options.set(METADATA_STATS_MODE_PER_LEVEL, 
singletonMap("0", "none"));
                         });
 
         BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
@@ -219,10 +219,7 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
     @Test
     public void testAsyncReader() throws Exception {
         FileStoreTable table = createFileStoreTable();
-        table =
-                table.copy(
-                        Collections.singletonMap(
-                                CoreOptions.FILE_READER_ASYNC_THRESHOLD.key(), 
"1 b"));
+        table = 
table.copy(singletonMap(CoreOptions.FILE_READER_ASYNC_THRESHOLD.key(), "1 b"));
 
         Map<Integer, GenericRow> rows = new HashMap<>();
         for (int i = 0; i < 20; i++) {
@@ -868,6 +865,7 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
                             conf.set(DELETION_VECTORS_ENABLED, true);
                             conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1));
                             conf.set("file-index.bloom-filter.columns", "b");
+                            conf.set(FILE_FORMAT_PER_LEVEL, singletonMap("0", 
"avro"));
                         });
 
         StreamTableWrite write =
@@ -880,13 +878,18 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
         write.write(rowData(1, 4, 500L));
         commit.commit(0, write.prepareCommit(true, 0));
 
+        // assert forcing rewriting when upgrading from level 0 to level x 
with different file
+        // formats
+        List<Split> splits = table.newReadBuilder().newScan().plan().splits();
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).fileName()).endsWith("parquet");
+
         write.write(rowData(1, 5, 100L));
         write.write(rowData(1, 6, 600L));
         write.write(rowData(1, 7, 400L));
         commit.commit(1, write.prepareCommit(true, 1));
 
         PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
-        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        splits = toSplits(table.newSnapshotReader().read().dataSplits());
         assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isEqualTo(2);
         TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
         assertThat(getResult(read, splits, BATCH_ROW_TO_STRING))
@@ -2311,10 +2314,7 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
 
     private void assertReadChangelog(int id, FileStoreTable table) throws 
Exception {
         // read the changelog at #{id}
-        table =
-                table.copy(
-                        Collections.singletonMap(
-                                CoreOptions.SCAN_SNAPSHOT_ID.key(), 
String.valueOf(id)));
+        table = table.copy(singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), 
String.valueOf(id)));
         ReadBuilder readBuilder = table.newReadBuilder();
         StreamTableScan scan = readBuilder.newStreamScan();
 

Reply via email to