This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/release-1.1 by this push:
new 5216ad5ab6 [core] Forcing rewrite when upgrading level0 in different
formats for lookup (#5598)
5216ad5ab6 is described below
commit 5216ad5ab6d318477f29a7a42a2bf8da636db645
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue May 13 17:48:34 2025 +0800
[core] Forcing rewrite when upgrading level0 in different formats for
lookup (#5598)
---
.../compact/LookupMergeTreeCompactRewriter.java | 11 ++++++++++
.../paimon/table/PrimaryKeySimpleTableTest.java | 24 +++++++++++-----------
2 files changed, 23 insertions(+), 12 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index 769eec356d..feb6d0b10b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -40,6 +40,8 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
+import java.util.function.IntFunction;
import static
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE;
import static
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_WITH_REWRITE;
@@ -55,6 +57,7 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
private final MergeFunctionWrapperFactory<T> wrapperFactory;
private final boolean noSequenceField;
@Nullable private final DeletionVectorsMaintainer dvMaintainer;
+ private final IntFunction<String> level2FileFormat;
public LookupMergeTreeCompactRewriter(
int maxLevel,
@@ -85,6 +88,9 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
this.lookupLevels = lookupLevels;
this.wrapperFactory = wrapperFactory;
this.noSequenceField = options.sequenceField().isEmpty();
+ String fileFormat = options.fileFormatString();
+ Map<Integer, String> fileFormatPerLevel = options.fileFormatPerLevel();
+ this.level2FileFormat = level ->
fileFormatPerLevel.getOrDefault(level, fileFormat);
}
@Override
@@ -106,6 +112,11 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
return NO_CHANGELOG_NO_REWRITE;
}
+ // forcing rewriting when upgrading from level 0 to level x with
different file formats
+ if
(!level2FileFormat.apply(file.level()).equals(level2FileFormat.apply(outputLevel)))
{
+ return CHANGELOG_WITH_REWRITE;
+ }
+
// In deletionVector mode, since drop delete is required, when delete
row count > 0 rewrite
// is required.
if (dvMaintainer != null && file.deleteRowCount().map(cnt -> cnt >
0).orElse(true)) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 59dee1092a..76dac80214 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -99,6 +99,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static java.util.Collections.singletonMap;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.CHANGELOG_FILE_FORMAT;
import static org.apache.paimon.CoreOptions.CHANGELOG_FILE_STATS_MODE;
@@ -109,6 +110,7 @@ import static
org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT_PER_LEVEL;
import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE;
@@ -163,9 +165,7 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
createFileStoreTable(
options -> {
options.set(FILE_FORMAT, format);
- options.set(
- METADATA_STATS_MODE_PER_LEVEL,
- Collections.singletonMap("0", "none"));
+ options.set(METADATA_STATS_MODE_PER_LEVEL,
singletonMap("0", "none"));
});
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
@@ -219,10 +219,7 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
@Test
public void testAsyncReader() throws Exception {
FileStoreTable table = createFileStoreTable();
- table =
- table.copy(
- Collections.singletonMap(
- CoreOptions.FILE_READER_ASYNC_THRESHOLD.key(),
"1 b"));
+ table =
table.copy(singletonMap(CoreOptions.FILE_READER_ASYNC_THRESHOLD.key(), "1 b"));
Map<Integer, GenericRow> rows = new HashMap<>();
for (int i = 0; i < 20; i++) {
@@ -868,6 +865,7 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
conf.set(DELETION_VECTORS_ENABLED, true);
conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1));
conf.set("file-index.bloom-filter.columns", "b");
+ conf.set(FILE_FORMAT_PER_LEVEL, singletonMap("0",
"avro"));
});
StreamTableWrite write =
@@ -880,13 +878,18 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
write.write(rowData(1, 4, 500L));
commit.commit(0, write.prepareCommit(true, 0));
+ // assert forcing rewriting when upgrading from level 0 to level x
with different file
+ // formats
+ List<Split> splits = table.newReadBuilder().newScan().plan().splits();
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().get(0).fileName()).endsWith("parquet");
+
write.write(rowData(1, 5, 100L));
write.write(rowData(1, 6, 600L));
write.write(rowData(1, 7, 400L));
commit.commit(1, write.prepareCommit(true, 1));
PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
- List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ splits = toSplits(table.newSnapshotReader().read().dataSplits());
assertThat(((DataSplit)
splits.get(0)).dataFiles().size()).isEqualTo(2);
TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
assertThat(getResult(read, splits, BATCH_ROW_TO_STRING))
@@ -2311,10 +2314,7 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
private void assertReadChangelog(int id, FileStoreTable table) throws
Exception {
// read the changelog at #{id}
- table =
- table.copy(
- Collections.singletonMap(
- CoreOptions.SCAN_SNAPSHOT_ID.key(),
String.valueOf(id)));
+ table = table.copy(singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(),
String.valueOf(id)));
ReadBuilder readBuilder = table.newReadBuilder();
StreamTableScan scan = readBuilder.newStreamScan();