This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new da6e72052c [core] Introduce 'lookup.remote-file.enabled' to optimize 
lookup (#6533)
da6e72052c is described below

commit da6e72052cc7693dca75b277eae5acb206b32e29
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Nov 6 11:11:36 2025 +0800

    [core] Introduce 'lookup.remote-file.enabled' to optimize lookup (#6533)
---
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  10 ++
 .../paimon/io/KeyValueFileReaderFactory.java       |   8 ++
 .../org/apache/paimon/mergetree/LookupFile.java    |  16 ++-
 .../org/apache/paimon/mergetree/LookupLevels.java  | 113 +++++++++++-----
 .../compact/ChangelogMergeTreeRewriter.java        |   1 +
 .../compact/LookupMergeTreeCompactRewriter.java    |  24 +++-
 .../compact/MergeTreeCompactRewriter.java          |   8 +-
 .../mergetree/compact/RemoteLookupFileManager.java | 105 +++++++++++++++
 .../paimon/operation/KeyValueFileStoreWrite.java   |  39 ++++--
 .../paimon/table/format/FormatTableWrite.java      |   2 +-
 .../apache/paimon/table/query/LocalTableQuery.java |   3 +-
 .../apache/paimon/table/sink/BatchTableWrite.java  |   4 +
 .../paimon/mergetree/ContainsLevelsTest.java       |   2 +-
 .../apache/paimon/mergetree/LookupLevelsTest.java  |   2 +-
 .../flink/lookup/DeletionVectorsTableTest.java     | 148 +++++++++++++++++++++
 16 files changed, 436 insertions(+), 55 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 0b7fcca75f..3e2b78d3fb 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -705,6 +705,12 @@ Mainly to resolve data skew on primary keys. We recommend 
starting with 64 mb wh
             <td>Integer</td>
             <td>Threshold for merging records to binary buffer in lookup.</td>
         </tr>
+        <tr>
+            <td><h5>lookup.remote-file.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable the remote file for lookup.</td>
+        </tr>
         <tr>
             <td><h5>manifest.compression</h5></td>
             <td style="word-wrap: break-word;">"zstd"</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index c90760183d..7f8bc8728a 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1177,6 +1177,12 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Define the default false positive probability for 
lookup cache bloom filters.");
 
+    public static final ConfigOption<Boolean> LOOKUP_REMOTE_FILE_ENABLED =
+            key("lookup.remote-file.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to enable the remote file for 
lookup.");
+
     public static final ConfigOption<Integer> READ_BATCH_SIZE =
             key("read.batch-size")
                     .intType()
@@ -2463,6 +2469,10 @@ public class CoreOptions implements Serializable {
         return options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE);
     }
 
+    public boolean lookupRemoteFileEnabled() {
+        return options.get(LOOKUP_REMOTE_FILE_ENABLED);
+    }
+
     public double lookupCacheHighPrioPoolRatio() {
         return options.get(LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 0b28bc0b4e..8459c97516 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -95,6 +95,14 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
         this.dvFactory = dvFactory;
     }
 
+    public TableSchema schema() {
+        return schema;
+    }
+
+    public DataFilePathFactory pathFactory() {
+        return pathFactory;
+    }
+
     @Override
     public RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws 
IOException {
         if (file.fileSize() >= asyncThreshold && 
file.fileName().endsWith(".orc")) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
index 7469684dac..52d669bf6e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.mergetree;
 
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.lookup.LookupStoreReader;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.types.RowType;
@@ -49,7 +48,7 @@ public class LookupFile {
     private static final Logger LOG = 
LoggerFactory.getLogger(LookupFile.class);
 
     private final File localFile;
-    private final DataFileMeta remoteFile;
+    private final int level;
     private final LookupStoreReader reader;
     private final Runnable callback;
 
@@ -57,14 +56,17 @@ public class LookupFile {
     private long hitCount;
     private boolean isClosed = false;
 
-    public LookupFile(
-            File localFile, DataFileMeta remoteFile, LookupStoreReader reader, 
Runnable callback) {
+    public LookupFile(File localFile, int level, LookupStoreReader reader, 
Runnable callback) {
         this.localFile = localFile;
-        this.remoteFile = remoteFile;
+        this.level = level;
         this.reader = reader;
         this.callback = callback;
     }
 
+    public File localFile() {
+        return localFile;
+    }
+
     @Nullable
     public byte[] get(byte[] key) throws IOException {
         checkArgument(!isClosed);
@@ -76,8 +78,8 @@ public class LookupFile {
         return res;
     }
 
-    public DataFileMeta remoteFile() {
-        return remoteFile;
+    public int level() {
+        return level;
     }
 
     public boolean isClosed() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
index f7904af7ea..f90e22cd10 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
@@ -55,23 +55,27 @@ import static 
org.apache.paimon.utils.VarLengthIntUtils.encodeLong;
 /** Provide lookup by key. */
 public class LookupLevels<T> implements Levels.DropFileCallback, Closeable {
 
+    public static final String CURRENT_VERSION = "v1";
+    public static final String REMOTE_LOOKUP_FILE_SUFFIX = ".lookup";
+
     private final Levels levels;
     private final Comparator<InternalRow> keyComparator;
     private final RowCompactedSerializer keySerializer;
-    private final ValueProcessor<T> valueProcessor;
+    private final PersistProcessor<T> persistProcessor;
     private final IOFunction<DataFileMeta, RecordReader<KeyValue>> 
fileReaderFactory;
     private final Function<String, File> localFileFactory;
     private final LookupStoreFactory lookupStoreFactory;
     private final Function<Long, BloomFilter.Builder> bfGenerator;
-
     private final Cache<String, LookupFile> lookupFileCache;
     private final Set<String> ownCachedFiles;
 
+    @Nullable private RemoteFileDownloader remoteFileDownloader;
+
     public LookupLevels(
             Levels levels,
             Comparator<InternalRow> keyComparator,
             RowType keyType,
-            ValueProcessor<T> valueProcessor,
+            PersistProcessor<T> persistProcessor,
             IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
             Function<String, File> localFileFactory,
             LookupStoreFactory lookupStoreFactory,
@@ -80,7 +84,7 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
         this.levels = levels;
         this.keyComparator = keyComparator;
         this.keySerializer = new RowCompactedSerializer(keyType);
-        this.valueProcessor = valueProcessor;
+        this.persistProcessor = persistProcessor;
         this.fileReaderFactory = fileReaderFactory;
         this.localFileFactory = localFileFactory;
         this.lookupStoreFactory = lookupStoreFactory;
@@ -90,6 +94,10 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
         levels.addDropFileCallback(this);
     }
 
+    public void setRemoteFileDownloader(@Nullable RemoteFileDownloader 
remoteFileDownloader) {
+        this.remoteFileDownloader = remoteFileDownloader;
+    }
+
     public Levels getLevels() {
         return levels;
     }
@@ -140,33 +148,51 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
             valueBytes = lookupFile.get(keyBytes);
         } finally {
             if (newCreatedLookupFile) {
-                lookupFileCache.put(file.fileName(), lookupFile);
+                addLocalFile(file, lookupFile);
             }
         }
         if (valueBytes == null) {
             return null;
         }
 
-        return valueProcessor.readFromDisk(
-                key, lookupFile.remoteFile().level(), valueBytes, 
file.fileName());
+        return persistProcessor.readFromDisk(key, lookupFile.level(), 
valueBytes, file.fileName());
     }
 
-    private LookupFile createLookupFile(DataFileMeta file) throws IOException {
+    public LookupFile createLookupFile(DataFileMeta file) throws IOException {
         File localFile = localFileFactory.apply(file.fileName());
         if (!localFile.createNewFile()) {
             throw new IOException("Can not create new file: " + localFile);
         }
-        LookupStoreWriter kvWriter =
-                lookupStoreFactory.createWriter(localFile, 
bfGenerator.apply(file.rowCount()));
-        try (RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) {
+
+        if (remoteFileDownloader == null || 
!remoteFileDownloader.tryToDownload(file, localFile)) {
+            createSstFileFromDataFile(file, localFile);
+        }
+
+        ownCachedFiles.add(file.fileName());
+        return new LookupFile(
+                localFile,
+                file.level(),
+                lookupStoreFactory.createReader(localFile),
+                () -> ownCachedFiles.remove(file.fileName()));
+    }
+
+    public void addLocalFile(DataFileMeta file, LookupFile lookupFile) {
+        lookupFileCache.put(file.fileName(), lookupFile);
+    }
+
+    private void createSstFileFromDataFile(DataFileMeta file, File localFile) 
throws IOException {
+        try (LookupStoreWriter kvWriter =
+                        lookupStoreFactory.createWriter(
+                                localFile, bfGenerator.apply(file.rowCount()));
+                RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) 
{
             KeyValue kv;
-            if (valueProcessor.withPosition()) {
+            if (persistProcessor.withPosition()) {
                 FileRecordIterator<KeyValue> batch;
                 while ((batch = (FileRecordIterator<KeyValue>) 
reader.readBatch()) != null) {
                     while ((kv = batch.next()) != null) {
                         byte[] keyBytes = 
keySerializer.serializeToBytes(kv.key());
                         byte[] valueBytes =
-                                valueProcessor.persistToDisk(kv, 
batch.returnedPosition());
+                                persistProcessor.persistToDisk(kv, 
batch.returnedPosition());
                         kvWriter.put(keyBytes, valueBytes);
                     }
                     batch.releaseBatch();
@@ -176,7 +202,7 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
                 while ((batch = reader.readBatch()) != null) {
                     while ((kv = batch.next()) != null) {
                         byte[] keyBytes = 
keySerializer.serializeToBytes(kv.key());
-                        byte[] valueBytes = valueProcessor.persistToDisk(kv);
+                        byte[] valueBytes = persistProcessor.persistToDisk(kv);
                         kvWriter.put(keyBytes, valueBytes);
                     }
                     batch.releaseBatch();
@@ -185,16 +211,16 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
         } catch (IOException e) {
             FileIOUtils.deleteFileOrDirectory(localFile);
             throw e;
-        } finally {
-            kvWriter.close();
         }
+    }
 
-        ownCachedFiles.add(file.fileName());
-        return new LookupFile(
-                localFile,
-                file,
-                lookupStoreFactory.createReader(localFile),
-                () -> ownCachedFiles.remove(file.fileName()));
+    public String remoteSstName(String dataFileName) {
+        return dataFileName
+                + "."
+                + persistProcessor.identifier()
+                + "."
+                + CURRENT_VERSION
+                + REMOTE_LOOKUP_FILE_SUFFIX;
     }
 
     @Override
@@ -206,7 +232,9 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
     }
 
     /** Processor to process value. */
-    public interface ValueProcessor<T> {
+    public interface PersistProcessor<T> {
+
+        String identifier();
 
         boolean withPosition();
 
@@ -219,15 +247,20 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
         T readFromDisk(InternalRow key, int level, byte[] valueBytes, String 
fileName);
     }
 
-    /** A {@link ValueProcessor} to return {@link KeyValue}. */
-    public static class KeyValueProcessor implements ValueProcessor<KeyValue> {
+    /** A {@link PersistProcessor} to return {@link KeyValue}. */
+    public static class PersistValueProcessor implements 
PersistProcessor<KeyValue> {
 
         private final RowCompactedSerializer valueSerializer;
 
-        public KeyValueProcessor(RowType valueType) {
+        public PersistValueProcessor(RowType valueType) {
             this.valueSerializer = new RowCompactedSerializer(valueType);
         }
 
+        @Override
+        public String identifier() {
+            return "value";
+        }
+
         @Override
         public boolean withPosition() {
             return false;
@@ -253,11 +286,16 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
         }
     }
 
-    /** A {@link ValueProcessor} to return {@link Boolean} only. */
-    public static class ContainsValueProcessor implements 
ValueProcessor<Boolean> {
+    /** A {@link PersistProcessor} to return {@link Boolean} only. */
+    public static class PersistEmptyProcessor implements 
PersistProcessor<Boolean> {
 
         private static final byte[] EMPTY_BYTES = new byte[0];
 
+        @Override
+        public String identifier() {
+            return "empty";
+        }
+
         @Override
         public boolean withPosition() {
             return false;
@@ -274,16 +312,22 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
         }
     }
 
-    /** A {@link ValueProcessor} to return {@link PositionedKeyValue}. */
-    public static class PositionedKeyValueProcessor implements 
ValueProcessor<PositionedKeyValue> {
+    /** A {@link PersistProcessor} to return {@link PositionedKeyValue}. */
+    public static class PersistPositionProcessor implements 
PersistProcessor<PositionedKeyValue> {
+
         private final boolean persistValue;
         private final RowCompactedSerializer valueSerializer;
 
-        public PositionedKeyValueProcessor(RowType valueType, boolean 
persistValue) {
+        public PersistPositionProcessor(RowType valueType, boolean 
persistValue) {
             this.persistValue = persistValue;
             this.valueSerializer = persistValue ? new 
RowCompactedSerializer(valueType) : null;
         }
 
+        @Override
+        public String identifier() {
+            return persistValue ? "position-and-value" : "position";
+        }
+
         @Override
         public boolean withPosition() {
             return true;
@@ -334,6 +378,7 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
 
     /** {@link KeyValue} with file name and row position for DeletionVector. */
     public static class PositionedKeyValue {
+
         private final @Nullable KeyValue keyValue;
         private final String fileName;
         private final long rowPosition;
@@ -357,4 +402,10 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
             return keyValue;
         }
     }
+
+    /** Downloader to try to download remote lookup file to local. */
+    public interface RemoteFileDownloader {
+
+        boolean tryToDownload(DataFileMeta dataFile, File localFile);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index 56c61d8743..4ef71757c3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -184,6 +184,7 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
 
         if (rewriteCompactFile) {
             notifyRewriteCompactBefore(before);
+            after = notifyRewriteCompactAfter(after);
         }
 
         List<DataFileMeta> changelogFiles =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index 896fba0d60..be038a2123 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -38,6 +38,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -59,6 +60,8 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
     @Nullable private final BucketedDvMaintainer dvMaintainer;
     private final IntFunction<String> level2FileFormat;
 
+    @Nullable private final RemoteLookupFileManager<T> remoteLookupFileManager;
+
     public LookupMergeTreeCompactRewriter(
             int maxLevel,
             MergeEngine mergeEngine,
@@ -72,7 +75,8 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
             MergeFunctionWrapperFactory<T> wrapperFactory,
             boolean produceChangelog,
             @Nullable BucketedDvMaintainer dvMaintainer,
-            CoreOptions options) {
+            CoreOptions options,
+            @Nullable RemoteLookupFileManager<T> remoteLookupFileManager) {
         super(
                 maxLevel,
                 mergeEngine,
@@ -91,6 +95,7 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
         String fileFormat = options.fileFormatString();
         Map<Integer, String> fileFormatPerLevel = options.fileFormatPerLevel();
         this.level2FileFormat = level -> 
fileFormatPerLevel.getOrDefault(level, fileFormat);
+        this.remoteLookupFileManager = remoteLookupFileManager;
     }
 
     @Override
@@ -100,6 +105,23 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
         }
     }
 
+    @Override
+    protected List<DataFileMeta> notifyRewriteCompactAfter(List<DataFileMeta> 
files) {
+        if (remoteLookupFileManager == null) {
+            return files;
+        }
+
+        List<DataFileMeta> result = new ArrayList<>();
+        for (DataFileMeta file : files) {
+            try {
+                result.add(remoteLookupFileManager.genRemoteLookupFile(file));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return result;
+    }
+
     @Override
     protected boolean rewriteChangelog(
             int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index df299fd84c..f235826187 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -104,7 +104,9 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
 
         List<DataFileMeta> before = extractFilesFromSections(sections);
         notifyRewriteCompactBefore(before);
-        return new CompactResult(before, writer.result());
+        List<DataFileMeta> after = writer.result();
+        after = notifyRewriteCompactAfter(after);
+        return new CompactResult(before, after);
     }
 
     protected <T> RecordReader<T> readerForMergeTree(
@@ -120,4 +122,8 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
     }
 
     protected void notifyRewriteCompactBefore(List<DataFileMeta> files) {}
+
+    protected List<DataFileMeta> notifyRewriteCompactAfter(List<DataFileMeta> 
files) {
+        return files;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
new file mode 100644
index 0000000000..23aed652c8
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.mergetree.LookupFile;
+import org.apache.paimon.mergetree.LookupLevels;
+import org.apache.paimon.mergetree.LookupLevels.RemoteFileDownloader;
+import org.apache.paimon.schema.TableSchema;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Manager to manage remote files for lookup. */
+public class RemoteLookupFileManager<T> implements RemoteFileDownloader {
+
+    private final FileIO fileIO;
+    private final DataFilePathFactory pathFactory;
+    private final TableSchema schema;
+    private final LookupLevels<T> lookupLevels;
+
+    public RemoteLookupFileManager(
+            FileIO fileIO,
+            DataFilePathFactory pathFactory,
+            TableSchema schema,
+            LookupLevels<T> lookupLevels) {
+        this.fileIO = fileIO;
+        this.pathFactory = pathFactory;
+        this.schema = schema;
+        this.lookupLevels = lookupLevels;
+        this.lookupLevels.setRemoteFileDownloader(this);
+    }
+
+    public DataFileMeta genRemoteLookupFile(DataFileMeta file) throws 
IOException {
+        String remoteSstName = lookupLevels.remoteSstName(file.fileName());
+        if (file.extraFiles().contains(remoteSstName)) {
+            // ignore existed
+            return file;
+        }
+
+        Path sstFile = remoteSstPath(file, remoteSstName);
+        LookupFile lookupFile = lookupLevels.createLookupFile(file);
+        try (FileInputStream is = new FileInputStream(lookupFile.localFile());
+                PositionOutputStream os = fileIO.newOutputStream(sstFile, 
false)) {
+            IOUtils.copy(is, os);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        lookupLevels.addLocalFile(file, lookupFile);
+        List<String> extraFiles = new ArrayList<>(file.extraFiles());
+        extraFiles.add(remoteSstName);
+        return file.copy(extraFiles);
+    }
+
+    @Override
+    public boolean tryToDownload(DataFileMeta dataFile, File localFile) {
+        if (dataFile.schemaId() != schema.id()) {
+            return false;
+        }
+
+        String remoteSstName = lookupLevels.remoteSstName(dataFile.fileName());
+        if (dataFile.extraFiles().contains(remoteSstName)) {
+            Path remoteSstPath = remoteSstPath(dataFile, remoteSstName);
+            try (SeekableInputStream is = fileIO.newInputStream(remoteSstPath);
+                    FileOutputStream os = new FileOutputStream(localFile)) {
+                IOUtils.copy(is, os);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            return true;
+        }
+        return false;
+    }
+
+    private Path remoteSstPath(DataFileMeta file, String remoteSstName) {
+        return new Path(pathFactory.toPath(file).getParent(), remoteSstName);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 24b08f06c1..ee012b11ad 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -45,9 +45,10 @@ import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.mergetree.Levels;
 import org.apache.paimon.mergetree.LookupFile;
 import org.apache.paimon.mergetree.LookupLevels;
-import org.apache.paimon.mergetree.LookupLevels.ContainsValueProcessor;
-import org.apache.paimon.mergetree.LookupLevels.KeyValueProcessor;
-import org.apache.paimon.mergetree.LookupLevels.PositionedKeyValueProcessor;
+import org.apache.paimon.mergetree.LookupLevels.PersistEmptyProcessor;
+import org.apache.paimon.mergetree.LookupLevels.PersistPositionProcessor;
+import org.apache.paimon.mergetree.LookupLevels.PersistProcessor;
+import org.apache.paimon.mergetree.LookupLevels.PersistValueProcessor;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.MergeTreeWriter;
 import org.apache.paimon.mergetree.compact.CompactRewriter;
@@ -63,6 +64,7 @@ import 
org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
 import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
 import org.apache.paimon.mergetree.compact.OffPeakHours;
+import org.apache.paimon.mergetree.compact.RemoteLookupFileManager;
 import org.apache.paimon.mergetree.compact.UniversalCompaction;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -110,6 +112,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
     private final CoreOptions options;
     private final RowType keyType;
     private final RowType valueType;
+    private final FileIO fileIO;
     private final RowType partitionType;
     private final String commitUser;
     @Nullable private final RecordLevelExpire recordLevelExpire;
@@ -144,6 +147,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 dbMaintainerFactory,
                 dvMaintainerFactory,
                 tableName);
+        this.fileIO = fileIO;
         this.partitionType = partitionType;
         this.keyType = keyType;
         this.valueType = valueType;
@@ -310,8 +314,9 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             Levels levels,
             @Nullable BucketedDvMaintainer dvMaintainer) {
         DeletionVector.Factory dvFactory = 
DeletionVector.factory(dvMaintainer);
-        FileReaderFactory<KeyValue> readerFactory =
+        KeyValueFileReaderFactory keyReaderFactory =
                 readerFactoryBuilder.build(partition, bucket, dvFactory);
+        FileReaderFactory<KeyValue> readerFactory = keyReaderFactory;
         if (recordLevelExpire != null) {
             readerFactory = recordLevelExpire.wrap(readerFactory);
         }
@@ -334,7 +339,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                     mergeSorter,
                     logDedupEqualSupplier.get());
         } else if (lookupStrategy.needLookup) {
-            LookupLevels.ValueProcessor<?> processor;
+            PersistProcessor<?> processor;
             LookupMergeTreeCompactRewriter.MergeFunctionWrapperFactory<?> 
wrapperFactory;
             FileReaderFactory<KeyValue> lookupReaderFactory = readerFactory;
             if (lookupStrategy.isFirstRow) {
@@ -347,27 +352,38 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                                 .copyWithoutProjection()
                                 .withReadValueType(RowType.of())
                                 .build(partition, bucket, dvFactory);
-                processor = new ContainsValueProcessor();
+                processor = new PersistEmptyProcessor();
                 wrapperFactory = new FirstRowMergeFunctionWrapperFactory();
             } else {
                 processor =
                         lookupStrategy.deletionVector
-                                ? new PositionedKeyValueProcessor(
+                                ? new PersistPositionProcessor(
                                         valueType,
                                         lookupStrategy.produceChangelog
                                                 || mergeEngine != DEDUPLICATE
                                                 || 
!options.sequenceField().isEmpty())
-                                : new KeyValueProcessor(valueType);
+                                : new PersistValueProcessor(valueType);
                 wrapperFactory =
                         new LookupMergeFunctionWrapperFactory<>(
                                 logDedupEqualSupplier.get(),
                                 lookupStrategy,
                                 UserDefinedSeqComparator.create(valueType, 
options));
             }
+            LookupLevels<?> lookupLevels =
+                    createLookupLevels(partition, bucket, levels, processor, 
lookupReaderFactory);
+            RemoteLookupFileManager<?> remoteLookupFileManager = null;
+            if (options.lookupRemoteFileEnabled()) {
+                remoteLookupFileManager =
+                        new RemoteLookupFileManager<>(
+                                fileIO,
+                                keyReaderFactory.pathFactory(),
+                                keyReaderFactory.schema(),
+                                lookupLevels);
+            }
             return new LookupMergeTreeCompactRewriter(
                     maxLevel,
                     mergeEngine,
-                    createLookupLevels(partition, bucket, levels, processor, 
lookupReaderFactory),
+                    lookupLevels,
                     readerFactory,
                     writerFactory,
                     keyComparator,
@@ -377,7 +393,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                     wrapperFactory,
                     lookupStrategy.produceChangelog,
                     dvMaintainer,
-                    options);
+                    options,
+                    remoteLookupFileManager);
         } else {
             return new MergeTreeCompactRewriter(
                     readerFactory,
@@ -393,7 +410,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             BinaryRow partition,
             int bucket,
             Levels levels,
-            LookupLevels.ValueProcessor<T> valueProcessor,
+            PersistProcessor<T> valueProcessor,
             FileReaderFactory<KeyValue> readerFactory) {
         if (ioManager == null) {
             throw new RuntimeException(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
index 27b47b129c..363523e6c7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
@@ -115,7 +115,7 @@ public class FormatTableWrite implements BatchTableWrite {
     }
 
     @Override
-    public TableWrite withIOManager(IOManager ioManager) {
+    public BatchTableWrite withIOManager(IOManager ioManager) {
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 7e0cbc62f3..6b80c41ceb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -140,7 +140,8 @@ public class LocalTableQuery implements TableQuery {
                         levels,
                         keyComparatorSupplier.get(),
                         readerFactoryBuilder.keyType(),
-                        new 
LookupLevels.KeyValueProcessor(readerFactoryBuilder.readValueType()),
+                        new LookupLevels.PersistValueProcessor(
+                                readerFactoryBuilder.readValueType()),
                         file -> {
                             RecordReader<KeyValue> reader = 
factory.createRecordReader(file);
                             if (cacheRowFilter != null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java
index 416bb58a30..538b50b3cd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.sink;
 
 import org.apache.paimon.annotation.Public;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.types.RowType;
 
 import java.util.List;
@@ -31,6 +32,9 @@ import java.util.List;
 @Public
 public interface BatchTableWrite extends TableWrite {
 
+    @Override
+    BatchTableWrite withIOManager(IOManager ioManager);
+
     /**
      * Prepare commit for {@link TableCommit}. Collect incremental files for 
this write.
      *
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 6448d1bdfb..27d436d0be 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -196,7 +196,7 @@ public class ContainsLevelsTest {
                 levels,
                 comparator,
                 keyType,
-                new LookupLevels.ContainsValueProcessor(),
+                new LookupLevels.PersistEmptyProcessor(),
                 file -> createReaderFactory().createRecordReader(file),
                 file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
                 new SortLookupStoreFactory(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index e264e2ba8e..bffed64a95 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -273,7 +273,7 @@ public class LookupLevelsTest {
                 levels,
                 comparator,
                 keyType,
-                new LookupLevels.KeyValueProcessor(rowType),
+                new LookupLevels.PersistValueProcessor(rowType),
                 file -> createReaderFactory().createRecordReader(file),
                 file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
                 new SortLookupStoreFactory(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DeletionVectorsTableTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DeletionVectorsTableTest.java
new file mode 100644
index 0000000000..60d1546d18
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DeletionVectorsTableTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LookupTable} with deletion vectors. */
+public class DeletionVectorsTableTest extends TableTestBase {
+
+    @TempDir java.nio.file.Path tempDir;
+    private IOManager ioManager;
+
+    @BeforeEach
+    public void before() throws IOException {
+        this.ioManager = new IOManagerImpl(tempDir.toString());
+    }
+
+    @Test
+    public void testRemoteFile() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.BUCKET, 1);
+        options.set(CoreOptions.DELETION_VECTORS_ENABLED, true);
+        options.set(CoreOptions.LOOKUP_REMOTE_FILE_ENABLED, true);
+        Identifier identifier = new Identifier("default", "t");
+        Schema schema =
+                new Schema(
+                        RowType.of(new IntType(), new IntType()).getFields(),
+                        Collections.emptyList(),
+                        Collections.singletonList("f0"),
+                        options.toMap(),
+                        null);
+        catalog.createTable(identifier, schema, false);
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+
+        // first write
+        try (BatchTableWrite write = 
writeBuilder.newWrite().withIOManager(ioManager);
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(GenericRow.of(1, 1));
+            write.write(GenericRow.of(2, 1));
+            write.write(GenericRow.of(3, 1));
+            commit.commit(write.prepareCommit());
+        }
+
+        // second write
+        try (BatchTableWrite write = 
writeBuilder.newWrite().withIOManager(ioManager);
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(GenericRow.of(1, 1));
+            write.write(GenericRow.of(4, 1));
+            write.write(GenericRow.of(5, 1));
+            commit.commit(write.prepareCommit());
+        }
+
+        // plan to assert
+        ReadBuilder readBuilder = table.newReadBuilder();
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        assertThat(splits).hasSize(1);
+        DataSplit firstSplit = (DataSplit) splits.get(0);
+        DataFileMeta firstFile = firstSplit.dataFiles().get(0);
+        List<String> extraFiles = firstFile.extraFiles();
+        assertThat(extraFiles.get(0)).endsWith(".position.v1.lookup");
+
+        // third write with lookup but no data file
+
+        // delete file first
+        LocalFileIO fileIO = LocalFileIO.create();
+        Path firstPath =
+                table.store()
+                        .pathFactory()
+                        .createDataFilePathFactory(firstSplit.partition(), 
firstSplit.bucket())
+                        .toPath(firstFile);
+        Path tmpPath = new Path(firstPath.getParent(), "tmp_file");
+        fileIO.copyFile(firstPath, tmpPath, false);
+        fileIO.delete(firstPath, false);
+
+        // should no exception when lookup in write
+        table = 
table.copy(Collections.singletonMap(CoreOptions.COMPACTION_SIZE_RATIO.key(), 
"0"));
+        writeBuilder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = 
writeBuilder.newWrite().withIOManager(ioManager);
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(GenericRow.of(1, 2));
+            write.write(GenericRow.of(2, 2));
+            commit.commit(write.prepareCommit());
+        }
+
+        // restore file and check reading
+        fileIO.copyFile(tmpPath, firstPath, false);
+        splits = readBuilder.newScan().plan().splits();
+        List<GenericRow> result = new ArrayList<>();
+        readBuilder
+                .newRead()
+                .createReader(splits)
+                .forEachRemaining(r -> result.add(GenericRow.of(r.getInt(0), 
r.getInt(1))));
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 2),
+                        GenericRow.of(2, 2),
+                        GenericRow.of(3, 1),
+                        GenericRow.of(4, 1),
+                        GenericRow.of(5, 1));
+    }
+}


Reply via email to