This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e04f3c7df [core] Pass file ser version to LookupSerializerFactory to 
let it deserialize from old version (#6677)
4e04f3c7df is described below

commit 4e04f3c7df49c410623c2b7b41c8b8b4189ec257
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 26 13:12:59 2025 +0800

    [core] Pass file ser version to LookupSerializerFactory to let it 
deserialize from old version (#6677)
---
 .../org/apache/paimon/mergetree/LookupFile.java    |  13 ++-
 .../org/apache/paimon/mergetree/LookupLevels.java  | 102 ++++++++++++++++-----
 .../lookup/DefaultLookupSerializerFactory.java     |   7 +-
 .../mergetree/lookup/LookupSerializerFactory.java  |   4 +-
 .../mergetree/lookup/PersistEmptyProcessor.java    |   4 +-
 .../mergetree/lookup/PersistPositionProcessor.java |   4 +-
 .../paimon/mergetree/lookup/PersistProcessor.java  |   4 +-
 .../lookup/PersistValueAndPosProcessor.java        |   7 +-
 .../mergetree/lookup/PersistValueProcessor.java    |   7 +-
 .../mergetree/lookup/RemoteFileDownloader.java     |   2 +-
 .../mergetree/lookup/RemoteLookupFileManager.java  |  24 +----
 11 files changed, 122 insertions(+), 56 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
index 219d9295b6..ae59876f53 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
@@ -50,6 +50,7 @@ public class LookupFile {
     private final File localFile;
     private final int level;
     private final long schemaId;
+    private final String serVersion;
     private final LookupStoreReader reader;
     private final Runnable callback;
 
@@ -58,10 +59,16 @@ public class LookupFile {
     private boolean isClosed = false;
 
     public LookupFile(
-            File localFile, int level, long schemaId, LookupStoreReader 
reader, Runnable callback) {
+            File localFile,
+            int level,
+            long schemaId,
+            String serVersion,
+            LookupStoreReader reader,
+            Runnable callback) {
         this.localFile = localFile;
         this.level = level;
         this.schemaId = schemaId;
+        this.serVersion = serVersion;
         this.reader = reader;
         this.callback = callback;
     }
@@ -74,6 +81,10 @@ public class LookupFile {
         return schemaId;
     }
 
+    public String serVersion() {
+        return serVersion;
+    }
+
     @Nullable
     public byte[] get(byte[] key) throws IOException {
         checkArgument(!isClosed);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
index 169d0923b2..f8dce8f2e8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
@@ -34,6 +34,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BloomFilter;
 import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.IOFunction;
+import org.apache.paimon.utils.Pair;
 
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
 
@@ -45,6 +46,7 @@ import java.io.IOException;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
@@ -68,8 +70,7 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
     private final Function<Long, BloomFilter.Builder> bfGenerator;
     private final Cache<String, LookupFile> lookupFileCache;
     private final Set<String> ownCachedFiles;
-    private final String remoteSstSuffix;
-    private final Map<Long, PersistProcessor<T>> schemaIdToProcessors;
+    private final Map<Pair<Long, String>, PersistProcessor<T>> 
schemaIdAndSerVersionToProcessors;
 
     @Nullable private RemoteFileDownloader remoteFileDownloader;
 
@@ -99,13 +100,7 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
         this.bfGenerator = bfGenerator;
         this.lookupFileCache = lookupFileCache;
         this.ownCachedFiles = new HashSet<>();
-        this.remoteSstSuffix =
-                "."
-                        + processorFactory.identifier()
-                        + "."
-                        + serializerFactory.identifier()
-                        + REMOTE_LOOKUP_FILE_SUFFIX;
-        this.schemaIdToProcessors = new ConcurrentHashMap<>();
+        this.schemaIdAndSerVersionToProcessors = new ConcurrentHashMap<>();
         levels.addDropFileCallback(this);
     }
 
@@ -170,17 +165,17 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
             return null;
         }
 
-        return getOrCreateProcessor(lookupFile.schemaId())
+        return getOrCreateProcessor(lookupFile.schemaId(), 
lookupFile.serVersion())
                 .readFromDisk(key, lookupFile.level(), valueBytes, 
file.fileName());
     }
 
-    private PersistProcessor<T> getOrCreateProcessor(long schemaId) {
-        return schemaIdToProcessors.computeIfAbsent(
-                schemaId,
+    private PersistProcessor<T> getOrCreateProcessor(long schemaId, String 
serVersion) {
+        return schemaIdAndSerVersionToProcessors.computeIfAbsent(
+                Pair.of(schemaId, serVersion),
                 id -> {
                     RowType fileSchema =
                             schemaId == currentSchemaId ? null : 
schemaFunction.apply(schemaId);
-                    return processorFactory.create(serializerFactory, 
fileSchema);
+                    return processorFactory.create(serVersion, 
serializerFactory, fileSchema);
                 });
     }
 
@@ -191,9 +186,12 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
         }
 
         long schemaId = this.currentSchemaId;
-        if (tryToDownloadRemoteSst(file, localFile)) {
+        String fileSerVersion = serializerFactory.version();
+        Optional<String> downloadSerVersion = tryToDownloadRemoteSst(file, 
localFile);
+        if (downloadSerVersion.isPresent()) {
             // use schema id from remote file
             schemaId = file.schemaId();
+            fileSerVersion = downloadSerVersion.get();
         } else {
             createSstFileFromDataFile(file, localFile);
         }
@@ -203,21 +201,35 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
                 localFile,
                 file.level(),
                 schemaId,
+                fileSerVersion,
                 lookupStoreFactory.createReader(localFile),
                 () -> ownCachedFiles.remove(file.fileName()));
     }
 
-    private boolean tryToDownloadRemoteSst(DataFileMeta file, File localFile) {
+    private Optional<String> tryToDownloadRemoteSst(DataFileMeta file, File 
localFile) {
         if (remoteFileDownloader == null) {
-            return false;
+            return Optional.empty();
+        }
+        Optional<RemoteSstFile> remoteSstFile = remoteSst(file);
+        if (!remoteSstFile.isPresent()) {
+            return Optional.empty();
         }
+
+        RemoteSstFile remoteSst = remoteSstFile.get();
+
         // validate schema matched, no exception here
         try {
-            getOrCreateProcessor(file.schemaId());
+            getOrCreateProcessor(file.schemaId(), remoteSst.serVersion);
         } catch (UnsupportedOperationException e) {
-            return false;
+            return Optional.empty();
+        }
+        boolean success =
+                remoteFileDownloader.tryToDownload(file, 
remoteSst.sstFileName, localFile);
+        if (!success) {
+            return Optional.empty();
         }
-        return remoteFileDownloader.tryToDownload(file, localFile);
+
+        return Optional.of(remoteSst.serVersion);
     }
 
     public void addLocalFile(DataFileMeta file, LookupFile lookupFile) {
@@ -229,7 +241,8 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
                         lookupStoreFactory.createWriter(
                                 localFile, bfGenerator.apply(file.rowCount()));
                 RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) 
{
-            PersistProcessor<T> processor = 
getOrCreateProcessor(currentSchemaId);
+            PersistProcessor<T> processor =
+                    getOrCreateProcessor(currentSchemaId, 
serializerFactory.version());
             KeyValue kv;
             if (processor.withPosition()) {
                 FileRecordIterator<KeyValue> batch;
@@ -258,8 +271,39 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
         }
     }
 
-    public String remoteSstSuffix() {
-        return remoteSstSuffix;
+    public Optional<RemoteSstFile> remoteSst(DataFileMeta file) {
+        Optional<String> sstFile =
+                file.extraFiles().stream()
+                        .filter(f -> f.endsWith(REMOTE_LOOKUP_FILE_SUFFIX))
+                        .findFirst();
+        if (!sstFile.isPresent()) {
+            return Optional.empty();
+        }
+
+        String sstFileName = sstFile.get();
+        String[] split = sstFileName.split("\\.");
+        if (split.length < 3) {
+            return Optional.empty();
+        }
+
+        String processorId = split[split.length - 3];
+        if (!processorFactory.identifier().equals(processorId)) {
+            return Optional.empty();
+        }
+
+        String serVersion = split[split.length - 2];
+        return Optional.of(new RemoteSstFile(sstFileName, serVersion));
+    }
+
+    public String newRemoteSst(DataFileMeta file, long length) {
+        return file.fileName()
+                + "."
+                + length
+                + "."
+                + processorFactory.identifier()
+                + "."
+                + serializerFactory.version()
+                + REMOTE_LOOKUP_FILE_SUFFIX;
     }
 
     @Override
@@ -269,4 +313,16 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
             lookupFileCache.invalidate(cachedFile);
         }
     }
+
+    /** Remote sst file with serVersion. */
+    public static class RemoteSstFile {
+
+        private final String sstFileName;
+        private final String serVersion;
+
+        private RemoteSstFile(String sstFileName, String serVersion) {
+            this.sstFileName = sstFileName;
+            this.serVersion = serVersion;
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/DefaultLookupSerializerFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/DefaultLookupSerializerFactory.java
index 54b79c9747..57df8a7e39 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/DefaultLookupSerializerFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/DefaultLookupSerializerFactory.java
@@ -30,7 +30,7 @@ import java.util.function.Function;
 public class DefaultLookupSerializerFactory implements LookupSerializerFactory 
{
 
     @Override
-    public String identifier() {
+    public String version() {
         return "v1";
     }
 
@@ -42,7 +42,10 @@ public class DefaultLookupSerializerFactory implements 
LookupSerializerFactory {
 
     @Override
     public Function<byte[], InternalRow> createDeserializer(
-            RowType currentSchema, @Nullable RowType fileSchema) {
+            String fileSerVersion, RowType currentSchema, @Nullable RowType 
fileSchema) {
+        if (!version().equals(fileSerVersion)) {
+            throw new UnsupportedOperationException();
+        }
         if (fileSchema != null && 
!fileSchema.equalsIgnoreNullable(currentSchema)) {
             throw new UnsupportedOperationException();
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/LookupSerializerFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/LookupSerializerFactory.java
index 4ab7cbc4ba..a7c8460758 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/LookupSerializerFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/LookupSerializerFactory.java
@@ -32,12 +32,12 @@ import java.util.function.Function;
 /** Factory to create serializer for lookup. */
 public interface LookupSerializerFactory {
 
-    String identifier();
+    String version();
 
     Function<InternalRow, byte[]> createSerializer(RowType currentSchema);
 
     Function<byte[], InternalRow> createDeserializer(
-            RowType currentSchema, @Nullable RowType fileSchema);
+            String fileSerVersion, RowType currentSchema, @Nullable RowType 
fileSchema);
 
     Supplier<LookupSerializerFactory> INSTANCE =
             Suppliers.memoize(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistEmptyProcessor.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistEmptyProcessor.java
index 7f7ab04a89..124d85394a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistEmptyProcessor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistEmptyProcessor.java
@@ -53,7 +53,9 @@ public class PersistEmptyProcessor implements 
PersistProcessor<Boolean> {
 
             @Override
             public PersistProcessor<Boolean> create(
-                    LookupSerializerFactory serializerFactory, @Nullable 
RowType fileSchema) {
+                    String fileSerVersion,
+                    LookupSerializerFactory serializerFactory,
+                    @Nullable RowType fileSchema) {
                 return new PersistEmptyProcessor();
             }
         };
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistPositionProcessor.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistPositionProcessor.java
index 14ee6a2870..91dcb4aff0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistPositionProcessor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistPositionProcessor.java
@@ -65,7 +65,9 @@ public class PersistPositionProcessor implements 
PersistProcessor<FilePosition>
 
             @Override
             public PersistProcessor<FilePosition> create(
-                    LookupSerializerFactory serializerFactory, @Nullable 
RowType fileSchema) {
+                    String fileSerVersion,
+                    LookupSerializerFactory serializerFactory,
+                    @Nullable RowType fileSchema) {
                 return new PersistPositionProcessor();
             }
         };
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistProcessor.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistProcessor.java
index 7858cd0ea6..3aa52e745a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistProcessor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistProcessor.java
@@ -43,6 +43,8 @@ public interface PersistProcessor<T> {
         String identifier();
 
         PersistProcessor<T> create(
-                LookupSerializerFactory serializerFactory, @Nullable RowType 
fileSchema);
+                String fileSerVersion,
+                LookupSerializerFactory serializerFactory,
+                @Nullable RowType fileSchema);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistValueAndPosProcessor.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistValueAndPosProcessor.java
index 75fb76af27..762676cdfd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistValueAndPosProcessor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistValueAndPosProcessor.java
@@ -85,10 +85,13 @@ public class PersistValueAndPosProcessor implements 
PersistProcessor<PositionedK
 
             @Override
             public PersistProcessor<PositionedKeyValue> create(
-                    LookupSerializerFactory serializerFactory, @Nullable 
RowType fileSchema) {
+                    String fileSerVersion,
+                    LookupSerializerFactory serializerFactory,
+                    @Nullable RowType fileSchema) {
                 return new PersistValueAndPosProcessor(
                         serializerFactory.createSerializer(valueType),
-                        serializerFactory.createDeserializer(valueType, 
fileSchema));
+                        serializerFactory.createDeserializer(
+                                fileSerVersion, valueType, fileSchema));
             }
         };
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistValueProcessor.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistValueProcessor.java
index 4696340fde..9ca462516c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistValueProcessor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistValueProcessor.java
@@ -73,10 +73,13 @@ public class PersistValueProcessor implements 
PersistProcessor<KeyValue> {
 
             @Override
             public PersistProcessor<KeyValue> create(
-                    LookupSerializerFactory serializerFactory, @Nullable 
RowType fileSchema) {
+                    String fileSerVersion,
+                    LookupSerializerFactory serializerFactory,
+                    @Nullable RowType fileSchema) {
                 return new PersistValueProcessor(
                         serializerFactory.createSerializer(valueType),
-                        serializerFactory.createDeserializer(valueType, 
fileSchema));
+                        serializerFactory.createDeserializer(
+                                fileSerVersion, valueType, fileSchema));
             }
         };
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/RemoteFileDownloader.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/RemoteFileDownloader.java
index a680f337f1..8b7a0d370a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/RemoteFileDownloader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/RemoteFileDownloader.java
@@ -25,5 +25,5 @@ import java.io.File;
 /** Downloader to try to download remote lookup file to local. */
 public interface RemoteFileDownloader {
 
-    boolean tryToDownload(DataFileMeta dataFile, File localFile);
+    boolean tryToDownload(DataFileMeta dataFile, String remoteSstFile, File 
localFile);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/RemoteLookupFileManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/RemoteLookupFileManager.java
index 927a7d4c7a..44b1a13680 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/RemoteLookupFileManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/RemoteLookupFileManager.java
@@ -37,7 +37,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 
 /** Manager to manage remote files for lookup. */
 public class RemoteLookupFileManager<T> implements RemoteFileDownloader {
@@ -66,14 +65,14 @@ public class RemoteLookupFileManager<T> implements 
RemoteFileDownloader {
             return file;
         }
 
-        if (remoteSst(file).isPresent()) {
+        if (lookupLevels.remoteSst(file).isPresent()) {
             // ignore existed
             return file;
         }
 
         LookupFile lookupFile = lookupLevels.createLookupFile(file);
         long length = lookupFile.localFile().length();
-        String remoteSstName = newRemoteSstName(file, length);
+        String remoteSstName = lookupLevels.newRemoteSst(file, length);
         Path sstFile = remoteSstPath(file, remoteSstName);
         try (FileInputStream is = new FileInputStream(lookupFile.localFile());
                 PositionOutputStream os = fileIO.newOutputStream(sstFile, 
false)) {
@@ -88,13 +87,8 @@ public class RemoteLookupFileManager<T> implements 
RemoteFileDownloader {
     }
 
     @Override
-    public boolean tryToDownload(DataFileMeta dataFile, File localFile) {
-        Optional<String> remoteSst = remoteSst(dataFile);
-        if (!remoteSst.isPresent()) {
-            return false;
-        }
-
-        Path remoteSstPath = remoteSstPath(dataFile, remoteSst.get());
+    public boolean tryToDownload(DataFileMeta dataFile, String remoteSstFile, 
File localFile) {
+        Path remoteSstPath = remoteSstPath(dataFile, remoteSstFile);
         try (SeekableInputStream is = fileIO.newInputStream(remoteSstPath);
                 FileOutputStream os = new FileOutputStream(localFile)) {
             IOUtils.copy(is, os);
@@ -105,16 +99,6 @@ public class RemoteLookupFileManager<T> implements 
RemoteFileDownloader {
         }
     }
 
-    private Optional<String> remoteSst(DataFileMeta file) {
-        return file.extraFiles().stream()
-                .filter(f -> f.endsWith(lookupLevels.remoteSstSuffix()))
-                .findFirst();
-    }
-
-    private String newRemoteSstName(DataFileMeta file, long length) {
-        return file.fileName() + "." + length + lookupLevels.remoteSstSuffix();
-    }
-
     private Path remoteSstPath(DataFileMeta file, String remoteSstName) {
         return new Path(pathFactory.toPath(file).getParent(), remoteSstName);
     }

Reply via email to