This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d9039d97d4 [core] Introduce 'lookup.remote-file.level-threshold' to 
let high levels use remote only (#6614)
d9039d97d4 is described below

commit d9039d97d4c3baa26ad98e7a27d0723105dd6fda
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Nov 17 23:25:41 2025 +0800

    [core] Introduce 'lookup.remote-file.level-threshold' to let high levels 
use remote only (#6614)
---
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 12 ++++
 .../mergetree/compact/RemoteLookupFileManager.java |  9 ++-
 .../paimon/operation/KeyValueFileStoreWrite.java   |  3 +-
 ...bleTest.java => LookupRemoteFileTableTest.java} | 64 +++++++++++++++++++++-
 5 files changed, 89 insertions(+), 5 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 0698b72f0f..c3251aa979 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -723,6 +723,12 @@ Mainly to resolve data skew on primary keys. We recommend 
starting with 64 mb wh
             <td>Boolean</td>
             <td>Whether to enable the remote file for lookup.</td>
         </tr>
+        <tr>
+            <td><h5>lookup.remote-file.level-threshold</h5></td>
+            <td style="word-wrap: break-word;">-2147483648</td>
+            <td>Integer</td>
+            <td>Level threshold of lookup to generate remote lookup files. 
Level files below this threshold will not generate remote lookup files.</td>
+        </tr>
         <tr>
             <td><h5>manifest.compression</h5></td>
             <td style="word-wrap: break-word;">"zstd"</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 464082cf6e..ccb490ec41 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1185,6 +1185,14 @@ public class CoreOptions implements Serializable {
                     .defaultValue(false)
                     .withDescription("Whether to enable the remote file for 
lookup.");
 
+    public static final ConfigOption<Integer> LOOKUP_REMOTE_LEVEL_THRESHOLD =
+            key("lookup.remote-file.level-threshold")
+                    .intType()
+                    .defaultValue(Integer.MIN_VALUE)
+                    .withDescription(
+                            "Level threshold of lookup to generate remote 
lookup files. "
+                                    + "Level files below this threshold will 
not generate remote lookup files.");
+
     public static final ConfigOption<Integer> READ_BATCH_SIZE =
             key("read.batch-size")
                     .intType()
@@ -2514,6 +2522,10 @@ public class CoreOptions implements Serializable {
         return options.get(LOOKUP_REMOTE_FILE_ENABLED);
     }
 
+    public int lookupRemoteLevelThreshold() {
+        return options.get(LOOKUP_REMOTE_LEVEL_THRESHOLD);
+    }
+
     public double lookupCacheHighPrioPoolRatio() {
         return options.get(LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
index fa2c2c51f9..0751d99cd8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
@@ -49,6 +49,7 @@ public class RemoteLookupFileManager<T> implements 
RemoteFileDownloader {
     private final DataFilePathFactory pathFactory;
     private final TableSchema schema;
     private final LookupLevels<T> lookupLevels;
+    private final int levelThreshold;
     private final SchemaManager schemaManager;
     private final Map<Long, RowType> schemaRowTypes;
 
@@ -57,17 +58,23 @@ public class RemoteLookupFileManager<T> implements 
RemoteFileDownloader {
             DataFilePathFactory pathFactory,
             TableSchema schema,
             LookupLevels<T> lookupLevels,
-            SchemaManager schemaManager) {
+            SchemaManager schemaManager,
+            int levelThreshold) {
         this.fileIO = fileIO;
         this.pathFactory = pathFactory;
         this.schema = schema;
         this.lookupLevels = lookupLevels;
+        this.levelThreshold = levelThreshold;
         this.lookupLevels.setRemoteFileDownloader(this);
         this.schemaManager = schemaManager;
         this.schemaRowTypes = new HashMap<>();
     }
 
     public DataFileMeta genRemoteLookupFile(DataFileMeta file) throws 
IOException {
+        if (file.level() < levelThreshold) {
+            return file;
+        }
+
         String remoteSstName = lookupLevels.remoteSstName(file.fileName());
         if (file.extraFiles().contains(remoteSstName)) {
             // ignore existed
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 4c85ca7f0a..8e4d8cb7a1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -381,7 +381,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                                 keyReaderFactory.pathFactory(),
                                 keyReaderFactory.schema(),
                                 lookupLevels,
-                                schemaManager);
+                                schemaManager,
+                                options.lookupRemoteLevelThreshold());
             }
             return new LookupMergeTreeCompactRewriter(
                     maxLevel,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DeletionVectorsTableTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupRemoteFileTableTest.java
similarity index 71%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DeletionVectorsTableTest.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupRemoteFileTableTest.java
index 0d07826f99..61f3b65a85 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DeletionVectorsTableTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupRemoteFileTableTest.java
@@ -53,8 +53,8 @@ import java.util.List;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Test for {@link LookupTable} with deletion vectors. */
-public class DeletionVectorsTableTest extends TableTestBase {
+/** Test for {@link LookupTable} with remote files. */
+public class LookupRemoteFileTableTest extends TableTestBase {
 
     @TempDir java.nio.file.Path tempDir;
     private IOManager ioManager;
@@ -79,7 +79,7 @@ public class DeletionVectorsTableTest extends TableTestBase {
         innerTestRemoteFile(true, true);
     }
 
-    public void innerTestRemoteFile(boolean schemaEvolution, boolean 
notCompatible)
+    private void innerTestRemoteFile(boolean schemaEvolution, boolean 
notCompatible)
             throws Exception {
         Options options = new Options();
         options.set(CoreOptions.BUCKET, 1);
@@ -183,4 +183,62 @@ public class DeletionVectorsTableTest extends 
TableTestBase {
                         GenericRow.of(4, 1),
                         GenericRow.of(5, 1));
     }
+
+    @Test
+    public void testRemoteFileLevelThreshold() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.BUCKET, 1);
+        options.set(CoreOptions.DELETION_VECTORS_ENABLED, true);
+        options.set(CoreOptions.LOOKUP_REMOTE_FILE_ENABLED, true);
+        options.set(CoreOptions.LOOKUP_REMOTE_LEVEL_THRESHOLD, 5);
+        Identifier identifier = new Identifier("default", "t");
+        Schema schema =
+                new Schema(
+                        RowType.of(new IntType(), new IntType()).getFields(),
+                        Collections.emptyList(),
+                        Collections.singletonList("f0"),
+                        options.toMap(),
+                        null);
+        catalog.createTable(identifier, schema, false);
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+
+        // first write
+        try (BatchTableWrite write = 
writeBuilder.newWrite().withIOManager(ioManager);
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(GenericRow.of(1, 1));
+            write.write(GenericRow.of(2, 1));
+            write.write(GenericRow.of(3, 1));
+            commit.commit(write.prepareCommit());
+        }
+
+        // second write
+        try (BatchTableWrite write = 
writeBuilder.newWrite().withIOManager(ioManager);
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(GenericRow.of(1, 1));
+            write.write(GenericRow.of(4, 1));
+            write.write(GenericRow.of(5, 1));
+            commit.commit(write.prepareCommit());
+        }
+
+        // third write generate level 4
+        table = 
table.copy(Collections.singletonMap(CoreOptions.COMPACTION_SIZE_RATIO.key(), 
"0"));
+        writeBuilder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = 
writeBuilder.newWrite().withIOManager(ioManager);
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(GenericRow.of(1, 2, 2));
+            write.write(GenericRow.of(2, 2, 2));
+            commit.commit(write.prepareCommit());
+        }
+
+        // assert level threshold
+        ReadBuilder readBuilder = table.newReadBuilder();
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        assertThat(splits).hasSize(1);
+        List<DataFileMeta> files = ((DataSplit) splits.get(0)).dataFiles();
+        DataFileMeta level5 = files.stream().filter(f -> f.level() == 
5).findFirst().get();
+        DataFileMeta level4 = files.stream().filter(f -> f.level() == 
4).findFirst().get();
+        assertThat(level5.extraFiles()).hasSize(1);
+        assertThat(level4.extraFiles()).hasSize(0);
+    }
 }

Reply via email to