This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d9039d97d4 [core] Introduce 'lookup.remote-file.level-threshold' to
let high levels use remote only (#6614)
d9039d97d4 is described below
commit d9039d97d4c3baa26ad98e7a27d0723105dd6fda
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Nov 17 23:25:41 2025 +0800
[core] Introduce 'lookup.remote-file.level-threshold' to let high levels
use remote only (#6614)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 12 ++++
.../mergetree/compact/RemoteLookupFileManager.java | 9 ++-
.../paimon/operation/KeyValueFileStoreWrite.java | 3 +-
...bleTest.java => LookupRemoteFileTableTest.java} | 64 +++++++++++++++++++++-
5 files changed, 89 insertions(+), 5 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 0698b72f0f..c3251aa979 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -723,6 +723,12 @@ Mainly to resolve data skew on primary keys. We recommend
starting with 64 mb wh
<td>Boolean</td>
<td>Whether to enable the remote file for lookup.</td>
</tr>
+ <tr>
+ <td><h5>lookup.remote-file.level-threshold</h5></td>
+ <td style="word-wrap: break-word;">-2147483648</td>
+ <td>Integer</td>
+ <td>Level threshold of lookup to generate remote lookup files.
Level files below this threshold will not generate remote lookup files.</td>
+ </tr>
<tr>
<td><h5>manifest.compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 464082cf6e..ccb490ec41 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1185,6 +1185,14 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether to enable the remote file for
lookup.");
+ public static final ConfigOption<Integer> LOOKUP_REMOTE_LEVEL_THRESHOLD =
+ key("lookup.remote-file.level-threshold")
+ .intType()
+ .defaultValue(Integer.MIN_VALUE)
+ .withDescription(
+ "Level threshold of lookup to generate remote
lookup files. "
+ + "Level files below this threshold will
not generate remote lookup files.");
+
public static final ConfigOption<Integer> READ_BATCH_SIZE =
key("read.batch-size")
.intType()
@@ -2514,6 +2522,10 @@ public class CoreOptions implements Serializable {
return options.get(LOOKUP_REMOTE_FILE_ENABLED);
}
+ public int lookupRemoteLevelThreshold() {
+ return options.get(LOOKUP_REMOTE_LEVEL_THRESHOLD);
+ }
+
public double lookupCacheHighPrioPoolRatio() {
return options.get(LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
index fa2c2c51f9..0751d99cd8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
@@ -49,6 +49,7 @@ public class RemoteLookupFileManager<T> implements
RemoteFileDownloader {
private final DataFilePathFactory pathFactory;
private final TableSchema schema;
private final LookupLevels<T> lookupLevels;
+ private final int levelThreshold;
private final SchemaManager schemaManager;
private final Map<Long, RowType> schemaRowTypes;
@@ -57,17 +58,23 @@ public class RemoteLookupFileManager<T> implements
RemoteFileDownloader {
DataFilePathFactory pathFactory,
TableSchema schema,
LookupLevels<T> lookupLevels,
- SchemaManager schemaManager) {
+ SchemaManager schemaManager,
+ int levelThreshold) {
this.fileIO = fileIO;
this.pathFactory = pathFactory;
this.schema = schema;
this.lookupLevels = lookupLevels;
+ this.levelThreshold = levelThreshold;
this.lookupLevels.setRemoteFileDownloader(this);
this.schemaManager = schemaManager;
this.schemaRowTypes = new HashMap<>();
}
public DataFileMeta genRemoteLookupFile(DataFileMeta file) throws
IOException {
+ if (file.level() < levelThreshold) {
+ return file;
+ }
+
String remoteSstName = lookupLevels.remoteSstName(file.fileName());
if (file.extraFiles().contains(remoteSstName)) {
// ignore existed
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 4c85ca7f0a..8e4d8cb7a1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -381,7 +381,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
keyReaderFactory.pathFactory(),
keyReaderFactory.schema(),
lookupLevels,
- schemaManager);
+ schemaManager,
+ options.lookupRemoteLevelThreshold());
}
return new LookupMergeTreeCompactRewriter(
maxLevel,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DeletionVectorsTableTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupRemoteFileTableTest.java
similarity index 71%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DeletionVectorsTableTest.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupRemoteFileTableTest.java
index 0d07826f99..61f3b65a85 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DeletionVectorsTableTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupRemoteFileTableTest.java
@@ -53,8 +53,8 @@ import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Test for {@link LookupTable} with deletion vectors. */
-public class DeletionVectorsTableTest extends TableTestBase {
+/** Test for {@link LookupTable} with remote files. */
+public class LookupRemoteFileTableTest extends TableTestBase {
@TempDir java.nio.file.Path tempDir;
private IOManager ioManager;
@@ -79,7 +79,7 @@ public class DeletionVectorsTableTest extends TableTestBase {
innerTestRemoteFile(true, true);
}
- public void innerTestRemoteFile(boolean schemaEvolution, boolean
notCompatible)
+ private void innerTestRemoteFile(boolean schemaEvolution, boolean
notCompatible)
throws Exception {
Options options = new Options();
options.set(CoreOptions.BUCKET, 1);
@@ -183,4 +183,62 @@ public class DeletionVectorsTableTest extends
TableTestBase {
GenericRow.of(4, 1),
GenericRow.of(5, 1));
}
+
+ @Test
+ public void testRemoteFileLevelThreshold() throws Exception {
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.DELETION_VECTORS_ENABLED, true);
+ options.set(CoreOptions.LOOKUP_REMOTE_FILE_ENABLED, true);
+ options.set(CoreOptions.LOOKUP_REMOTE_LEVEL_THRESHOLD, 5);
+ Identifier identifier = new Identifier("default", "t");
+ Schema schema =
+ new Schema(
+ RowType.of(new IntType(), new IntType()).getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("f0"),
+ options.toMap(),
+ null);
+ catalog.createTable(identifier, schema, false);
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+
+ // first write
+ try (BatchTableWrite write =
writeBuilder.newWrite().withIOManager(ioManager);
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(1, 1));
+ write.write(GenericRow.of(2, 1));
+ write.write(GenericRow.of(3, 1));
+ commit.commit(write.prepareCommit());
+ }
+
+ // second write
+ try (BatchTableWrite write =
writeBuilder.newWrite().withIOManager(ioManager);
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(1, 1));
+ write.write(GenericRow.of(4, 1));
+ write.write(GenericRow.of(5, 1));
+ commit.commit(write.prepareCommit());
+ }
+
+ // third write generate level 4
+ table =
table.copy(Collections.singletonMap(CoreOptions.COMPACTION_SIZE_RATIO.key(),
"0"));
+ writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write =
writeBuilder.newWrite().withIOManager(ioManager);
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(1, 2, 2));
+ write.write(GenericRow.of(2, 2, 2));
+ commit.commit(write.prepareCommit());
+ }
+
+ // assert level threshold
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ assertThat(splits).hasSize(1);
+ List<DataFileMeta> files = ((DataSplit) splits.get(0)).dataFiles();
+ DataFileMeta level5 = files.stream().filter(f -> f.level() ==
5).findFirst().get();
+ DataFileMeta level4 = files.stream().filter(f -> f.level() ==
4).findFirst().get();
+ assertThat(level5.extraFiles()).hasSize(1);
+ assertThat(level4.extraFiles()).hasSize(0);
+ }
}