This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new da6e72052c [core] Introduce 'lookup.remote-file.enabled' to optimize
lookup (#6533)
da6e72052c is described below
commit da6e72052cc7693dca75b277eae5acb206b32e29
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Nov 6 11:11:36 2025 +0800
[core] Introduce 'lookup.remote-file.enabled' to optimize lookup (#6533)
---
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 10 ++
.../paimon/io/KeyValueFileReaderFactory.java | 8 ++
.../org/apache/paimon/mergetree/LookupFile.java | 16 ++-
.../org/apache/paimon/mergetree/LookupLevels.java | 113 +++++++++++-----
.../compact/ChangelogMergeTreeRewriter.java | 1 +
.../compact/LookupMergeTreeCompactRewriter.java | 24 +++-
.../compact/MergeTreeCompactRewriter.java | 8 +-
.../mergetree/compact/RemoteLookupFileManager.java | 105 +++++++++++++++
.../paimon/operation/KeyValueFileStoreWrite.java | 39 ++++--
.../paimon/table/format/FormatTableWrite.java | 2 +-
.../apache/paimon/table/query/LocalTableQuery.java | 3 +-
.../apache/paimon/table/sink/BatchTableWrite.java | 4 +
.../paimon/mergetree/ContainsLevelsTest.java | 2 +-
.../apache/paimon/mergetree/LookupLevelsTest.java | 2 +-
.../flink/lookup/DeletionVectorsTableTest.java | 148 +++++++++++++++++++++
16 files changed, 436 insertions(+), 55 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 0b7fcca75f..3e2b78d3fb 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -705,6 +705,12 @@ Mainly to resolve data skew on primary keys. We recommend
starting with 64 mb wh
<td>Integer</td>
<td>Threshold for merging records to binary buffer in lookup.</td>
</tr>
+ <tr>
+ <td><h5>lookup.remote-file.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to enable the remote file for lookup.</td>
+ </tr>
<tr>
<td><h5>manifest.compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index c90760183d..7f8bc8728a 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1177,6 +1177,12 @@ public class CoreOptions implements Serializable {
.withDescription(
"Define the default false positive probability for
lookup cache bloom filters.");
+ public static final ConfigOption<Boolean> LOOKUP_REMOTE_FILE_ENABLED =
+ key("lookup.remote-file.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to enable the remote file for
lookup.");
+
public static final ConfigOption<Integer> READ_BATCH_SIZE =
key("read.batch-size")
.intType()
@@ -2463,6 +2469,10 @@ public class CoreOptions implements Serializable {
return options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE);
}
+ public boolean lookupRemoteFileEnabled() {
+ return options.get(LOOKUP_REMOTE_FILE_ENABLED);
+ }
+
public double lookupCacheHighPrioPoolRatio() {
return options.get(LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 0b28bc0b4e..8459c97516 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -95,6 +95,14 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
this.dvFactory = dvFactory;
}
+ public TableSchema schema() {
+ return schema;
+ }
+
+ public DataFilePathFactory pathFactory() {
+ return pathFactory;
+ }
+
@Override
public RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws
IOException {
if (file.fileSize() >= asyncThreshold &&
file.fileName().endsWith(".orc")) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
index 7469684dac..52d669bf6e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
@@ -19,7 +19,6 @@
package org.apache.paimon.mergetree;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.lookup.LookupStoreReader;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.types.RowType;
@@ -49,7 +48,7 @@ public class LookupFile {
private static final Logger LOG =
LoggerFactory.getLogger(LookupFile.class);
private final File localFile;
- private final DataFileMeta remoteFile;
+ private final int level;
private final LookupStoreReader reader;
private final Runnable callback;
@@ -57,14 +56,17 @@ public class LookupFile {
private long hitCount;
private boolean isClosed = false;
- public LookupFile(
- File localFile, DataFileMeta remoteFile, LookupStoreReader reader,
Runnable callback) {
+ public LookupFile(File localFile, int level, LookupStoreReader reader,
Runnable callback) {
this.localFile = localFile;
- this.remoteFile = remoteFile;
+ this.level = level;
this.reader = reader;
this.callback = callback;
}
+ public File localFile() {
+ return localFile;
+ }
+
@Nullable
public byte[] get(byte[] key) throws IOException {
checkArgument(!isClosed);
@@ -76,8 +78,8 @@ public class LookupFile {
return res;
}
- public DataFileMeta remoteFile() {
- return remoteFile;
+ public int level() {
+ return level;
}
public boolean isClosed() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
index f7904af7ea..f90e22cd10 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
@@ -55,23 +55,27 @@ import static
org.apache.paimon.utils.VarLengthIntUtils.encodeLong;
/** Provide lookup by key. */
public class LookupLevels<T> implements Levels.DropFileCallback, Closeable {
+ public static final String CURRENT_VERSION = "v1";
+ public static final String REMOTE_LOOKUP_FILE_SUFFIX = ".lookup";
+
private final Levels levels;
private final Comparator<InternalRow> keyComparator;
private final RowCompactedSerializer keySerializer;
- private final ValueProcessor<T> valueProcessor;
+ private final PersistProcessor<T> persistProcessor;
private final IOFunction<DataFileMeta, RecordReader<KeyValue>>
fileReaderFactory;
private final Function<String, File> localFileFactory;
private final LookupStoreFactory lookupStoreFactory;
private final Function<Long, BloomFilter.Builder> bfGenerator;
-
private final Cache<String, LookupFile> lookupFileCache;
private final Set<String> ownCachedFiles;
+ @Nullable private RemoteFileDownloader remoteFileDownloader;
+
public LookupLevels(
Levels levels,
Comparator<InternalRow> keyComparator,
RowType keyType,
- ValueProcessor<T> valueProcessor,
+ PersistProcessor<T> persistProcessor,
IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
Function<String, File> localFileFactory,
LookupStoreFactory lookupStoreFactory,
@@ -80,7 +84,7 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
this.levels = levels;
this.keyComparator = keyComparator;
this.keySerializer = new RowCompactedSerializer(keyType);
- this.valueProcessor = valueProcessor;
+ this.persistProcessor = persistProcessor;
this.fileReaderFactory = fileReaderFactory;
this.localFileFactory = localFileFactory;
this.lookupStoreFactory = lookupStoreFactory;
@@ -90,6 +94,10 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
levels.addDropFileCallback(this);
}
+ public void setRemoteFileDownloader(@Nullable RemoteFileDownloader
remoteFileDownloader) {
+ this.remoteFileDownloader = remoteFileDownloader;
+ }
+
public Levels getLevels() {
return levels;
}
@@ -140,33 +148,51 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
valueBytes = lookupFile.get(keyBytes);
} finally {
if (newCreatedLookupFile) {
- lookupFileCache.put(file.fileName(), lookupFile);
+ addLocalFile(file, lookupFile);
}
}
if (valueBytes == null) {
return null;
}
- return valueProcessor.readFromDisk(
- key, lookupFile.remoteFile().level(), valueBytes,
file.fileName());
+ return persistProcessor.readFromDisk(key, lookupFile.level(),
valueBytes, file.fileName());
}
- private LookupFile createLookupFile(DataFileMeta file) throws IOException {
+ public LookupFile createLookupFile(DataFileMeta file) throws IOException {
File localFile = localFileFactory.apply(file.fileName());
if (!localFile.createNewFile()) {
throw new IOException("Can not create new file: " + localFile);
}
- LookupStoreWriter kvWriter =
- lookupStoreFactory.createWriter(localFile,
bfGenerator.apply(file.rowCount()));
- try (RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) {
+
+ if (remoteFileDownloader == null ||
!remoteFileDownloader.tryToDownload(file, localFile)) {
+ createSstFileFromDataFile(file, localFile);
+ }
+
+ ownCachedFiles.add(file.fileName());
+ return new LookupFile(
+ localFile,
+ file.level(),
+ lookupStoreFactory.createReader(localFile),
+ () -> ownCachedFiles.remove(file.fileName()));
+ }
+
+ public void addLocalFile(DataFileMeta file, LookupFile lookupFile) {
+ lookupFileCache.put(file.fileName(), lookupFile);
+ }
+
+ private void createSstFileFromDataFile(DataFileMeta file, File localFile)
throws IOException {
+ try (LookupStoreWriter kvWriter =
+ lookupStoreFactory.createWriter(
+ localFile, bfGenerator.apply(file.rowCount()));
+ RecordReader<KeyValue> reader = fileReaderFactory.apply(file))
{
KeyValue kv;
- if (valueProcessor.withPosition()) {
+ if (persistProcessor.withPosition()) {
FileRecordIterator<KeyValue> batch;
while ((batch = (FileRecordIterator<KeyValue>)
reader.readBatch()) != null) {
while ((kv = batch.next()) != null) {
byte[] keyBytes =
keySerializer.serializeToBytes(kv.key());
byte[] valueBytes =
- valueProcessor.persistToDisk(kv,
batch.returnedPosition());
+ persistProcessor.persistToDisk(kv,
batch.returnedPosition());
kvWriter.put(keyBytes, valueBytes);
}
batch.releaseBatch();
@@ -176,7 +202,7 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
while ((batch = reader.readBatch()) != null) {
while ((kv = batch.next()) != null) {
byte[] keyBytes =
keySerializer.serializeToBytes(kv.key());
- byte[] valueBytes = valueProcessor.persistToDisk(kv);
+ byte[] valueBytes = persistProcessor.persistToDisk(kv);
kvWriter.put(keyBytes, valueBytes);
}
batch.releaseBatch();
@@ -185,16 +211,16 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
} catch (IOException e) {
FileIOUtils.deleteFileOrDirectory(localFile);
throw e;
- } finally {
- kvWriter.close();
}
+ }
- ownCachedFiles.add(file.fileName());
- return new LookupFile(
- localFile,
- file,
- lookupStoreFactory.createReader(localFile),
- () -> ownCachedFiles.remove(file.fileName()));
+ public String remoteSstName(String dataFileName) {
+ return dataFileName
+ + "."
+ + persistProcessor.identifier()
+ + "."
+ + CURRENT_VERSION
+ + REMOTE_LOOKUP_FILE_SUFFIX;
}
@Override
@@ -206,7 +232,9 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
}
/** Processor to process value. */
- public interface ValueProcessor<T> {
+ public interface PersistProcessor<T> {
+
+ String identifier();
boolean withPosition();
@@ -219,15 +247,20 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
T readFromDisk(InternalRow key, int level, byte[] valueBytes, String
fileName);
}
- /** A {@link ValueProcessor} to return {@link KeyValue}. */
- public static class KeyValueProcessor implements ValueProcessor<KeyValue> {
+ /** A {@link PersistProcessor} to return {@link KeyValue}. */
+ public static class PersistValueProcessor implements
PersistProcessor<KeyValue> {
private final RowCompactedSerializer valueSerializer;
- public KeyValueProcessor(RowType valueType) {
+ public PersistValueProcessor(RowType valueType) {
this.valueSerializer = new RowCompactedSerializer(valueType);
}
+ @Override
+ public String identifier() {
+ return "value";
+ }
+
@Override
public boolean withPosition() {
return false;
@@ -253,11 +286,16 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
}
}
- /** A {@link ValueProcessor} to return {@link Boolean} only. */
- public static class ContainsValueProcessor implements
ValueProcessor<Boolean> {
+ /** A {@link PersistProcessor} to return {@link Boolean} only. */
+ public static class PersistEmptyProcessor implements
PersistProcessor<Boolean> {
private static final byte[] EMPTY_BYTES = new byte[0];
+ @Override
+ public String identifier() {
+ return "empty";
+ }
+
@Override
public boolean withPosition() {
return false;
@@ -274,16 +312,22 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
}
}
- /** A {@link ValueProcessor} to return {@link PositionedKeyValue}. */
- public static class PositionedKeyValueProcessor implements
ValueProcessor<PositionedKeyValue> {
+ /** A {@link PersistProcessor} to return {@link PositionedKeyValue}. */
+ public static class PersistPositionProcessor implements
PersistProcessor<PositionedKeyValue> {
+
private final boolean persistValue;
private final RowCompactedSerializer valueSerializer;
- public PositionedKeyValueProcessor(RowType valueType, boolean
persistValue) {
+ public PersistPositionProcessor(RowType valueType, boolean
persistValue) {
this.persistValue = persistValue;
this.valueSerializer = persistValue ? new
RowCompactedSerializer(valueType) : null;
}
+ @Override
+ public String identifier() {
+ return persistValue ? "position-and-value" : "position";
+ }
+
@Override
public boolean withPosition() {
return true;
@@ -334,6 +378,7 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
/** {@link KeyValue} with file name and row position for DeletionVector. */
public static class PositionedKeyValue {
+
private final @Nullable KeyValue keyValue;
private final String fileName;
private final long rowPosition;
@@ -357,4 +402,10 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
return keyValue;
}
}
+
+ /** Downloader to try to download remote lookup file to local. */
+ public interface RemoteFileDownloader {
+
+ boolean tryToDownload(DataFileMeta dataFile, File localFile);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index 56c61d8743..4ef71757c3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -184,6 +184,7 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
if (rewriteCompactFile) {
notifyRewriteCompactBefore(before);
+ after = notifyRewriteCompactAfter(after);
}
List<DataFileMeta> changelogFiles =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index 896fba0d60..be038a2123 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -38,6 +38,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -59,6 +60,8 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
@Nullable private final BucketedDvMaintainer dvMaintainer;
private final IntFunction<String> level2FileFormat;
+ @Nullable private final RemoteLookupFileManager<T> remoteLookupFileManager;
+
public LookupMergeTreeCompactRewriter(
int maxLevel,
MergeEngine mergeEngine,
@@ -72,7 +75,8 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
MergeFunctionWrapperFactory<T> wrapperFactory,
boolean produceChangelog,
@Nullable BucketedDvMaintainer dvMaintainer,
- CoreOptions options) {
+ CoreOptions options,
+ @Nullable RemoteLookupFileManager<T> remoteLookupFileManager) {
super(
maxLevel,
mergeEngine,
@@ -91,6 +95,7 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
String fileFormat = options.fileFormatString();
Map<Integer, String> fileFormatPerLevel = options.fileFormatPerLevel();
this.level2FileFormat = level ->
fileFormatPerLevel.getOrDefault(level, fileFormat);
+ this.remoteLookupFileManager = remoteLookupFileManager;
}
@Override
@@ -100,6 +105,23 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
}
}
+ @Override
+ protected List<DataFileMeta> notifyRewriteCompactAfter(List<DataFileMeta>
files) {
+ if (remoteLookupFileManager == null) {
+ return files;
+ }
+
+ List<DataFileMeta> result = new ArrayList<>();
+ for (DataFileMeta file : files) {
+ try {
+ result.add(remoteLookupFileManager.genRemoteLookupFile(file));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return result;
+ }
+
@Override
protected boolean rewriteChangelog(
int outputLevel, boolean dropDelete, List<List<SortedRun>>
sections) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index df299fd84c..f235826187 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -104,7 +104,9 @@ public class MergeTreeCompactRewriter extends
AbstractCompactRewriter {
List<DataFileMeta> before = extractFilesFromSections(sections);
notifyRewriteCompactBefore(before);
- return new CompactResult(before, writer.result());
+ List<DataFileMeta> after = writer.result();
+ after = notifyRewriteCompactAfter(after);
+ return new CompactResult(before, after);
}
protected <T> RecordReader<T> readerForMergeTree(
@@ -120,4 +122,8 @@ public class MergeTreeCompactRewriter extends
AbstractCompactRewriter {
}
protected void notifyRewriteCompactBefore(List<DataFileMeta> files) {}
+
+ protected List<DataFileMeta> notifyRewriteCompactAfter(List<DataFileMeta>
files) {
+ return files;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
new file mode 100644
index 0000000000..23aed652c8
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.mergetree.LookupFile;
+import org.apache.paimon.mergetree.LookupLevels;
+import org.apache.paimon.mergetree.LookupLevels.RemoteFileDownloader;
+import org.apache.paimon.schema.TableSchema;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Manager to manage remote files for lookup. */
+public class RemoteLookupFileManager<T> implements RemoteFileDownloader {
+
+ private final FileIO fileIO;
+ private final DataFilePathFactory pathFactory;
+ private final TableSchema schema;
+ private final LookupLevels<T> lookupLevels;
+
+ public RemoteLookupFileManager(
+ FileIO fileIO,
+ DataFilePathFactory pathFactory,
+ TableSchema schema,
+ LookupLevels<T> lookupLevels) {
+ this.fileIO = fileIO;
+ this.pathFactory = pathFactory;
+ this.schema = schema;
+ this.lookupLevels = lookupLevels;
+ this.lookupLevels.setRemoteFileDownloader(this);
+ }
+
+ public DataFileMeta genRemoteLookupFile(DataFileMeta file) throws
IOException {
+ String remoteSstName = lookupLevels.remoteSstName(file.fileName());
+ if (file.extraFiles().contains(remoteSstName)) {
+ // ignore existed
+ return file;
+ }
+
+ Path sstFile = remoteSstPath(file, remoteSstName);
+ LookupFile lookupFile = lookupLevels.createLookupFile(file);
+ try (FileInputStream is = new FileInputStream(lookupFile.localFile());
+ PositionOutputStream os = fileIO.newOutputStream(sstFile,
false)) {
+ IOUtils.copy(is, os);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ lookupLevels.addLocalFile(file, lookupFile);
+ List<String> extraFiles = new ArrayList<>(file.extraFiles());
+ extraFiles.add(remoteSstName);
+ return file.copy(extraFiles);
+ }
+
+ @Override
+ public boolean tryToDownload(DataFileMeta dataFile, File localFile) {
+ if (dataFile.schemaId() != schema.id()) {
+ return false;
+ }
+
+ String remoteSstName = lookupLevels.remoteSstName(dataFile.fileName());
+ if (dataFile.extraFiles().contains(remoteSstName)) {
+ Path remoteSstPath = remoteSstPath(dataFile, remoteSstName);
+ try (SeekableInputStream is = fileIO.newInputStream(remoteSstPath);
+ FileOutputStream os = new FileOutputStream(localFile)) {
+ IOUtils.copy(is, os);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private Path remoteSstPath(DataFileMeta file, String remoteSstName) {
+ return new Path(pathFactory.toPath(file).getParent(), remoteSstName);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 24b08f06c1..ee012b11ad 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -45,9 +45,10 @@ import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.mergetree.LookupFile;
import org.apache.paimon.mergetree.LookupLevels;
-import org.apache.paimon.mergetree.LookupLevels.ContainsValueProcessor;
-import org.apache.paimon.mergetree.LookupLevels.KeyValueProcessor;
-import org.apache.paimon.mergetree.LookupLevels.PositionedKeyValueProcessor;
+import org.apache.paimon.mergetree.LookupLevels.PersistEmptyProcessor;
+import org.apache.paimon.mergetree.LookupLevels.PersistPositionProcessor;
+import org.apache.paimon.mergetree.LookupLevels.PersistProcessor;
+import org.apache.paimon.mergetree.LookupLevels.PersistValueProcessor;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeWriter;
import org.apache.paimon.mergetree.compact.CompactRewriter;
@@ -63,6 +64,7 @@ import
org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.OffPeakHours;
+import org.apache.paimon.mergetree.compact.RemoteLookupFileManager;
import org.apache.paimon.mergetree.compact.UniversalCompaction;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -110,6 +112,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
private final CoreOptions options;
private final RowType keyType;
private final RowType valueType;
+ private final FileIO fileIO;
private final RowType partitionType;
private final String commitUser;
@Nullable private final RecordLevelExpire recordLevelExpire;
@@ -144,6 +147,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
dbMaintainerFactory,
dvMaintainerFactory,
tableName);
+ this.fileIO = fileIO;
this.partitionType = partitionType;
this.keyType = keyType;
this.valueType = valueType;
@@ -310,8 +314,9 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
Levels levels,
@Nullable BucketedDvMaintainer dvMaintainer) {
DeletionVector.Factory dvFactory =
DeletionVector.factory(dvMaintainer);
- FileReaderFactory<KeyValue> readerFactory =
+ KeyValueFileReaderFactory keyReaderFactory =
readerFactoryBuilder.build(partition, bucket, dvFactory);
+ FileReaderFactory<KeyValue> readerFactory = keyReaderFactory;
if (recordLevelExpire != null) {
readerFactory = recordLevelExpire.wrap(readerFactory);
}
@@ -334,7 +339,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
mergeSorter,
logDedupEqualSupplier.get());
} else if (lookupStrategy.needLookup) {
- LookupLevels.ValueProcessor<?> processor;
+ PersistProcessor<?> processor;
LookupMergeTreeCompactRewriter.MergeFunctionWrapperFactory<?>
wrapperFactory;
FileReaderFactory<KeyValue> lookupReaderFactory = readerFactory;
if (lookupStrategy.isFirstRow) {
@@ -347,27 +352,38 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
.copyWithoutProjection()
.withReadValueType(RowType.of())
.build(partition, bucket, dvFactory);
- processor = new ContainsValueProcessor();
+ processor = new PersistEmptyProcessor();
wrapperFactory = new FirstRowMergeFunctionWrapperFactory();
} else {
processor =
lookupStrategy.deletionVector
- ? new PositionedKeyValueProcessor(
+ ? new PersistPositionProcessor(
valueType,
lookupStrategy.produceChangelog
|| mergeEngine != DEDUPLICATE
||
!options.sequenceField().isEmpty())
- : new KeyValueProcessor(valueType);
+ : new PersistValueProcessor(valueType);
wrapperFactory =
new LookupMergeFunctionWrapperFactory<>(
logDedupEqualSupplier.get(),
lookupStrategy,
UserDefinedSeqComparator.create(valueType,
options));
}
+ LookupLevels<?> lookupLevels =
+ createLookupLevels(partition, bucket, levels, processor,
lookupReaderFactory);
+ RemoteLookupFileManager<?> remoteLookupFileManager = null;
+ if (options.lookupRemoteFileEnabled()) {
+ remoteLookupFileManager =
+ new RemoteLookupFileManager<>(
+ fileIO,
+ keyReaderFactory.pathFactory(),
+ keyReaderFactory.schema(),
+ lookupLevels);
+ }
return new LookupMergeTreeCompactRewriter(
maxLevel,
mergeEngine,
- createLookupLevels(partition, bucket, levels, processor,
lookupReaderFactory),
+ lookupLevels,
readerFactory,
writerFactory,
keyComparator,
@@ -377,7 +393,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
wrapperFactory,
lookupStrategy.produceChangelog,
dvMaintainer,
- options);
+ options,
+ remoteLookupFileManager);
} else {
return new MergeTreeCompactRewriter(
readerFactory,
@@ -393,7 +410,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
BinaryRow partition,
int bucket,
Levels levels,
- LookupLevels.ValueProcessor<T> valueProcessor,
+ PersistProcessor<T> valueProcessor,
FileReaderFactory<KeyValue> readerFactory) {
if (ioManager == null) {
throw new RuntimeException(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
index 27b47b129c..363523e6c7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
@@ -115,7 +115,7 @@ public class FormatTableWrite implements BatchTableWrite {
}
@Override
- public TableWrite withIOManager(IOManager ioManager) {
+ public BatchTableWrite withIOManager(IOManager ioManager) {
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 7e0cbc62f3..6b80c41ceb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -140,7 +140,8 @@ public class LocalTableQuery implements TableQuery {
levels,
keyComparatorSupplier.get(),
readerFactoryBuilder.keyType(),
- new
LookupLevels.KeyValueProcessor(readerFactoryBuilder.readValueType()),
+ new LookupLevels.PersistValueProcessor(
+ readerFactoryBuilder.readValueType()),
file -> {
RecordReader<KeyValue> reader =
factory.createRecordReader(file);
if (cacheRowFilter != null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java
index 416bb58a30..538b50b3cd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.sink;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.types.RowType;
import java.util.List;
@@ -31,6 +32,9 @@ import java.util.List;
@Public
public interface BatchTableWrite extends TableWrite {
+ @Override
+ BatchTableWrite withIOManager(IOManager ioManager);
+
/**
* Prepare commit for {@link TableCommit}. Collect incremental files for
this write.
*
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 6448d1bdfb..27d436d0be 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -196,7 +196,7 @@ public class ContainsLevelsTest {
levels,
comparator,
keyType,
- new LookupLevels.ContainsValueProcessor(),
+ new LookupLevels.PersistEmptyProcessor(),
file -> createReaderFactory().createRecordReader(file),
file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
new SortLookupStoreFactory(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index e264e2ba8e..bffed64a95 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -273,7 +273,7 @@ public class LookupLevelsTest {
levels,
comparator,
keyType,
- new LookupLevels.KeyValueProcessor(rowType),
+ new LookupLevels.PersistValueProcessor(rowType),
file -> createReaderFactory().createRecordReader(file),
file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
new SortLookupStoreFactory(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DeletionVectorsTableTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DeletionVectorsTableTest.java
new file mode 100644
index 0000000000..60d1546d18
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DeletionVectorsTableTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LookupTable} with deletion vectors. */
+public class DeletionVectorsTableTest extends TableTestBase {
+
+ @TempDir java.nio.file.Path tempDir;
+ private IOManager ioManager;
+
+ @BeforeEach
+ public void before() throws IOException {
+ this.ioManager = new IOManagerImpl(tempDir.toString());
+ }
+
+ @Test
+ public void testRemoteFile() throws Exception {
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.DELETION_VECTORS_ENABLED, true);
+ options.set(CoreOptions.LOOKUP_REMOTE_FILE_ENABLED, true);
+ Identifier identifier = new Identifier("default", "t");
+ Schema schema =
+ new Schema(
+ RowType.of(new IntType(), new IntType()).getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("f0"),
+ options.toMap(),
+ null);
+ catalog.createTable(identifier, schema, false);
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+
+ // first write
+ try (BatchTableWrite write =
writeBuilder.newWrite().withIOManager(ioManager);
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(1, 1));
+ write.write(GenericRow.of(2, 1));
+ write.write(GenericRow.of(3, 1));
+ commit.commit(write.prepareCommit());
+ }
+
+ // second write
+ try (BatchTableWrite write =
writeBuilder.newWrite().withIOManager(ioManager);
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(1, 1));
+ write.write(GenericRow.of(4, 1));
+ write.write(GenericRow.of(5, 1));
+ commit.commit(write.prepareCommit());
+ }
+
+ // plan to assert
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ assertThat(splits).hasSize(1);
+ DataSplit firstSplit = (DataSplit) splits.get(0);
+ DataFileMeta firstFile = firstSplit.dataFiles().get(0);
+ List<String> extraFiles = firstFile.extraFiles();
+ assertThat(extraFiles.get(0)).endsWith(".position.v1.lookup");
+
+ // third write with lookup but no data file
+
+ // delete file first
+ LocalFileIO fileIO = LocalFileIO.create();
+ Path firstPath =
+ table.store()
+ .pathFactory()
+ .createDataFilePathFactory(firstSplit.partition(),
firstSplit.bucket())
+ .toPath(firstFile);
+ Path tmpPath = new Path(firstPath.getParent(), "tmp_file");
+ fileIO.copyFile(firstPath, tmpPath, false);
+ fileIO.delete(firstPath, false);
+
+ // should no exception when lookup in write
+ table =
table.copy(Collections.singletonMap(CoreOptions.COMPACTION_SIZE_RATIO.key(),
"0"));
+ writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write =
writeBuilder.newWrite().withIOManager(ioManager);
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(1, 2));
+ write.write(GenericRow.of(2, 2));
+ commit.commit(write.prepareCommit());
+ }
+
+ // restore file and check reading
+ fileIO.copyFile(tmpPath, firstPath, false);
+ splits = readBuilder.newScan().plan().splits();
+ List<GenericRow> result = new ArrayList<>();
+ readBuilder
+ .newRead()
+ .createReader(splits)
+ .forEachRemaining(r -> result.add(GenericRow.of(r.getInt(0),
r.getInt(1))));
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 2),
+ GenericRow.of(2, 2),
+ GenericRow.of(3, 1),
+ GenericRow.of(4, 1),
+ GenericRow.of(5, 1));
+ }
+}