This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4e04f3c7df [core] Pass file ser version to LookupSerializerFactory to
let it deserialize from old version (#6677)
4e04f3c7df is described below
commit 4e04f3c7df49c410623c2b7b41c8b8b4189ec257
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 26 13:12:59 2025 +0800
[core] Pass file ser version to LookupSerializerFactory to let it
deserialize from old version (#6677)
---
.../org/apache/paimon/mergetree/LookupFile.java | 13 ++-
.../org/apache/paimon/mergetree/LookupLevels.java | 102 ++++++++++++++++-----
.../lookup/DefaultLookupSerializerFactory.java | 7 +-
.../mergetree/lookup/LookupSerializerFactory.java | 4 +-
.../mergetree/lookup/PersistEmptyProcessor.java | 4 +-
.../mergetree/lookup/PersistPositionProcessor.java | 4 +-
.../paimon/mergetree/lookup/PersistProcessor.java | 4 +-
.../lookup/PersistValueAndPosProcessor.java | 7 +-
.../mergetree/lookup/PersistValueProcessor.java | 7 +-
.../mergetree/lookup/RemoteFileDownloader.java | 2 +-
.../mergetree/lookup/RemoteLookupFileManager.java | 24 +----
11 files changed, 122 insertions(+), 56 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
index 219d9295b6..ae59876f53 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
@@ -50,6 +50,7 @@ public class LookupFile {
private final File localFile;
private final int level;
private final long schemaId;
+ private final String serVersion;
private final LookupStoreReader reader;
private final Runnable callback;
@@ -58,10 +59,16 @@ public class LookupFile {
private boolean isClosed = false;
public LookupFile(
- File localFile, int level, long schemaId, LookupStoreReader
reader, Runnable callback) {
+ File localFile,
+ int level,
+ long schemaId,
+ String serVersion,
+ LookupStoreReader reader,
+ Runnable callback) {
this.localFile = localFile;
this.level = level;
this.schemaId = schemaId;
+ this.serVersion = serVersion;
this.reader = reader;
this.callback = callback;
}
@@ -74,6 +81,10 @@ public class LookupFile {
return schemaId;
}
+ public String serVersion() {
+ return serVersion;
+ }
+
@Nullable
public byte[] get(byte[] key) throws IOException {
checkArgument(!isClosed);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
index 169d0923b2..f8dce8f2e8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
@@ -34,6 +34,7 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BloomFilter;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.IOFunction;
+import org.apache.paimon.utils.Pair;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
@@ -45,6 +46,7 @@ import java.io.IOException;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
@@ -68,8 +70,7 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
private final Function<Long, BloomFilter.Builder> bfGenerator;
private final Cache<String, LookupFile> lookupFileCache;
private final Set<String> ownCachedFiles;
- private final String remoteSstSuffix;
- private final Map<Long, PersistProcessor<T>> schemaIdToProcessors;
+ private final Map<Pair<Long, String>, PersistProcessor<T>>
schemaIdAndSerVersionToProcessors;
@Nullable private RemoteFileDownloader remoteFileDownloader;
@@ -99,13 +100,7 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
this.bfGenerator = bfGenerator;
this.lookupFileCache = lookupFileCache;
this.ownCachedFiles = new HashSet<>();
- this.remoteSstSuffix =
- "."
- + processorFactory.identifier()
- + "."
- + serializerFactory.identifier()
- + REMOTE_LOOKUP_FILE_SUFFIX;
- this.schemaIdToProcessors = new ConcurrentHashMap<>();
+ this.schemaIdAndSerVersionToProcessors = new ConcurrentHashMap<>();
levels.addDropFileCallback(this);
}
@@ -170,17 +165,17 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
return null;
}
- return getOrCreateProcessor(lookupFile.schemaId())
+ return getOrCreateProcessor(lookupFile.schemaId(),
lookupFile.serVersion())
.readFromDisk(key, lookupFile.level(), valueBytes,
file.fileName());
}
- private PersistProcessor<T> getOrCreateProcessor(long schemaId) {
- return schemaIdToProcessors.computeIfAbsent(
- schemaId,
+ private PersistProcessor<T> getOrCreateProcessor(long schemaId, String
serVersion) {
+ return schemaIdAndSerVersionToProcessors.computeIfAbsent(
+ Pair.of(schemaId, serVersion),
id -> {
RowType fileSchema =
schemaId == currentSchemaId ? null :
schemaFunction.apply(schemaId);
- return processorFactory.create(serializerFactory,
fileSchema);
+ return processorFactory.create(serVersion,
serializerFactory, fileSchema);
});
}
@@ -191,9 +186,12 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
}
long schemaId = this.currentSchemaId;
- if (tryToDownloadRemoteSst(file, localFile)) {
+ String fileSerVersion = serializerFactory.version();
+ Optional<String> downloadSerVersion = tryToDownloadRemoteSst(file,
localFile);
+ if (downloadSerVersion.isPresent()) {
// use schema id from remote file
schemaId = file.schemaId();
+ fileSerVersion = downloadSerVersion.get();
} else {
createSstFileFromDataFile(file, localFile);
}
@@ -203,21 +201,35 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
localFile,
file.level(),
schemaId,
+ fileSerVersion,
lookupStoreFactory.createReader(localFile),
() -> ownCachedFiles.remove(file.fileName()));
}
- private boolean tryToDownloadRemoteSst(DataFileMeta file, File localFile) {
+ private Optional<String> tryToDownloadRemoteSst(DataFileMeta file, File
localFile) {
if (remoteFileDownloader == null) {
- return false;
+ return Optional.empty();
+ }
+ Optional<RemoteSstFile> remoteSstFile = remoteSst(file);
+ if (!remoteSstFile.isPresent()) {
+ return Optional.empty();
}
+
+ RemoteSstFile remoteSst = remoteSstFile.get();
+
// validate schema matched, no exception here
try {
- getOrCreateProcessor(file.schemaId());
+ getOrCreateProcessor(file.schemaId(), remoteSst.serVersion);
} catch (UnsupportedOperationException e) {
- return false;
+ return Optional.empty();
+ }
+ boolean success =
+ remoteFileDownloader.tryToDownload(file,
remoteSst.sstFileName, localFile);
+ if (!success) {
+ return Optional.empty();
}
- return remoteFileDownloader.tryToDownload(file, localFile);
+
+ return Optional.of(remoteSst.serVersion);
}
public void addLocalFile(DataFileMeta file, LookupFile lookupFile) {
@@ -229,7 +241,8 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
lookupStoreFactory.createWriter(
localFile, bfGenerator.apply(file.rowCount()));
RecordReader<KeyValue> reader = fileReaderFactory.apply(file))
{
- PersistProcessor<T> processor =
getOrCreateProcessor(currentSchemaId);
+ PersistProcessor<T> processor =
+ getOrCreateProcessor(currentSchemaId,
serializerFactory.version());
KeyValue kv;
if (processor.withPosition()) {
FileRecordIterator<KeyValue> batch;
@@ -258,8 +271,39 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
}
}
- public String remoteSstSuffix() {
- return remoteSstSuffix;
+ public Optional<RemoteSstFile> remoteSst(DataFileMeta file) {
+ Optional<String> sstFile =
+ file.extraFiles().stream()
+ .filter(f -> f.endsWith(REMOTE_LOOKUP_FILE_SUFFIX))
+ .findFirst();
+ if (!sstFile.isPresent()) {
+ return Optional.empty();
+ }
+
+ String sstFileName = sstFile.get();
+ String[] split = sstFileName.split("\\.");
+ if (split.length < 3) {
+ return Optional.empty();
+ }
+
+ String processorId = split[split.length - 3];
+ if (!processorFactory.identifier().equals(processorId)) {
+ return Optional.empty();
+ }
+
+ String serVersion = split[split.length - 2];
+ return Optional.of(new RemoteSstFile(sstFileName, serVersion));
+ }
+
+ public String newRemoteSst(DataFileMeta file, long length) {
+ return file.fileName()
+ + "."
+ + length
+ + "."
+ + processorFactory.identifier()
+ + "."
+ + serializerFactory.version()
+ + REMOTE_LOOKUP_FILE_SUFFIX;
}
@Override
@@ -269,4 +313,16 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
lookupFileCache.invalidate(cachedFile);
}
}
+
+ /** Remote sst file with serVersion. */
+ public static class RemoteSstFile {
+
+ private final String sstFileName;
+ private final String serVersion;
+
+ private RemoteSstFile(String sstFileName, String serVersion) {
+ this.sstFileName = sstFileName;
+ this.serVersion = serVersion;
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/DefaultLookupSerializerFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/DefaultLookupSerializerFactory.java
index 54b79c9747..57df8a7e39 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/DefaultLookupSerializerFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/DefaultLookupSerializerFactory.java
@@ -30,7 +30,7 @@ import java.util.function.Function;
public class DefaultLookupSerializerFactory implements LookupSerializerFactory
{
@Override
- public String identifier() {
+ public String version() {
return "v1";
}
@@ -42,7 +42,10 @@ public class DefaultLookupSerializerFactory implements
LookupSerializerFactory {
@Override
public Function<byte[], InternalRow> createDeserializer(
- RowType currentSchema, @Nullable RowType fileSchema) {
+ String fileSerVersion, RowType currentSchema, @Nullable RowType
fileSchema) {
+ if (!version().equals(fileSerVersion)) {
+ throw new UnsupportedOperationException();
+ }
if (fileSchema != null &&
!fileSchema.equalsIgnoreNullable(currentSchema)) {
throw new UnsupportedOperationException();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/LookupSerializerFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/LookupSerializerFactory.java
index 4ab7cbc4ba..a7c8460758 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/LookupSerializerFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/LookupSerializerFactory.java
@@ -32,12 +32,12 @@ import java.util.function.Function;
/** Factory to create serializer for lookup. */
public interface LookupSerializerFactory {
- String identifier();
+ String version();
Function<InternalRow, byte[]> createSerializer(RowType currentSchema);
Function<byte[], InternalRow> createDeserializer(
- RowType currentSchema, @Nullable RowType fileSchema);
+ String fileSerVersion, RowType currentSchema, @Nullable RowType
fileSchema);
Supplier<LookupSerializerFactory> INSTANCE =
Suppliers.memoize(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistEmptyProcessor.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistEmptyProcessor.java
index 7f7ab04a89..124d85394a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistEmptyProcessor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistEmptyProcessor.java
@@ -53,7 +53,9 @@ public class PersistEmptyProcessor implements
PersistProcessor<Boolean> {
@Override
public PersistProcessor<Boolean> create(
- LookupSerializerFactory serializerFactory, @Nullable
RowType fileSchema) {
+ String fileSerVersion,
+ LookupSerializerFactory serializerFactory,
+ @Nullable RowType fileSchema) {
return new PersistEmptyProcessor();
}
};
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistPositionProcessor.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistPositionProcessor.java
index 14ee6a2870..91dcb4aff0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistPositionProcessor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistPositionProcessor.java
@@ -65,7 +65,9 @@ public class PersistPositionProcessor implements
PersistProcessor<FilePosition>
@Override
public PersistProcessor<FilePosition> create(
- LookupSerializerFactory serializerFactory, @Nullable
RowType fileSchema) {
+ String fileSerVersion,
+ LookupSerializerFactory serializerFactory,
+ @Nullable RowType fileSchema) {
return new PersistPositionProcessor();
}
};
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistProcessor.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistProcessor.java
index 7858cd0ea6..3aa52e745a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistProcessor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistProcessor.java
@@ -43,6 +43,8 @@ public interface PersistProcessor<T> {
String identifier();
PersistProcessor<T> create(
- LookupSerializerFactory serializerFactory, @Nullable RowType
fileSchema);
+ String fileSerVersion,
+ LookupSerializerFactory serializerFactory,
+ @Nullable RowType fileSchema);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistValueAndPosProcessor.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistValueAndPosProcessor.java
index 75fb76af27..762676cdfd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistValueAndPosProcessor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistValueAndPosProcessor.java
@@ -85,10 +85,13 @@ public class PersistValueAndPosProcessor implements
PersistProcessor<PositionedK
@Override
public PersistProcessor<PositionedKeyValue> create(
- LookupSerializerFactory serializerFactory, @Nullable
RowType fileSchema) {
+ String fileSerVersion,
+ LookupSerializerFactory serializerFactory,
+ @Nullable RowType fileSchema) {
return new PersistValueAndPosProcessor(
serializerFactory.createSerializer(valueType),
- serializerFactory.createDeserializer(valueType,
fileSchema));
+ serializerFactory.createDeserializer(
+ fileSerVersion, valueType, fileSchema));
}
};
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistValueProcessor.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistValueProcessor.java
index 4696340fde..9ca462516c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistValueProcessor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/PersistValueProcessor.java
@@ -73,10 +73,13 @@ public class PersistValueProcessor implements
PersistProcessor<KeyValue> {
@Override
public PersistProcessor<KeyValue> create(
- LookupSerializerFactory serializerFactory, @Nullable
RowType fileSchema) {
+ String fileSerVersion,
+ LookupSerializerFactory serializerFactory,
+ @Nullable RowType fileSchema) {
return new PersistValueProcessor(
serializerFactory.createSerializer(valueType),
- serializerFactory.createDeserializer(valueType,
fileSchema));
+ serializerFactory.createDeserializer(
+ fileSerVersion, valueType, fileSchema));
}
};
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/RemoteFileDownloader.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/RemoteFileDownloader.java
index a680f337f1..8b7a0d370a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/RemoteFileDownloader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/RemoteFileDownloader.java
@@ -25,5 +25,5 @@ import java.io.File;
/** Downloader to try to download remote lookup file to local. */
public interface RemoteFileDownloader {
- boolean tryToDownload(DataFileMeta dataFile, File localFile);
+ boolean tryToDownload(DataFileMeta dataFile, String remoteSstFile, File
localFile);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/RemoteLookupFileManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/RemoteLookupFileManager.java
index 927a7d4c7a..44b1a13680 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/RemoteLookupFileManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/lookup/RemoteLookupFileManager.java
@@ -37,7 +37,6 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
/** Manager to manage remote files for lookup. */
public class RemoteLookupFileManager<T> implements RemoteFileDownloader {
@@ -66,14 +65,14 @@ public class RemoteLookupFileManager<T> implements
RemoteFileDownloader {
return file;
}
- if (remoteSst(file).isPresent()) {
+ if (lookupLevels.remoteSst(file).isPresent()) {
// ignore existed
return file;
}
LookupFile lookupFile = lookupLevels.createLookupFile(file);
long length = lookupFile.localFile().length();
- String remoteSstName = newRemoteSstName(file, length);
+ String remoteSstName = lookupLevels.newRemoteSst(file, length);
Path sstFile = remoteSstPath(file, remoteSstName);
try (FileInputStream is = new FileInputStream(lookupFile.localFile());
PositionOutputStream os = fileIO.newOutputStream(sstFile,
false)) {
@@ -88,13 +87,8 @@ public class RemoteLookupFileManager<T> implements
RemoteFileDownloader {
}
@Override
- public boolean tryToDownload(DataFileMeta dataFile, File localFile) {
- Optional<String> remoteSst = remoteSst(dataFile);
- if (!remoteSst.isPresent()) {
- return false;
- }
-
- Path remoteSstPath = remoteSstPath(dataFile, remoteSst.get());
+ public boolean tryToDownload(DataFileMeta dataFile, String remoteSstFile,
File localFile) {
+ Path remoteSstPath = remoteSstPath(dataFile, remoteSstFile);
try (SeekableInputStream is = fileIO.newInputStream(remoteSstPath);
FileOutputStream os = new FileOutputStream(localFile)) {
IOUtils.copy(is, os);
@@ -105,16 +99,6 @@ public class RemoteLookupFileManager<T> implements
RemoteFileDownloader {
}
}
- private Optional<String> remoteSst(DataFileMeta file) {
- return file.extraFiles().stream()
- .filter(f -> f.endsWith(lookupLevels.remoteSstSuffix()))
- .findFirst();
- }
-
- private String newRemoteSstName(DataFileMeta file, long length) {
- return file.fileName() + "." + length + lookupLevels.remoteSstSuffix();
- }
-
private Path remoteSstPath(DataFileMeta file, String remoteSstName) {
return new Path(pathFactory.toPath(file).getParent(), remoteSstName);
}