This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d91da16ecf [core] Add file size to the name of remote sst (#6615)
d91da16ecf is described below
commit d91da16ecfe654ce031fcfd148279a45cf1b7db4
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Nov 18 23:09:55 2025 +0800
[core] Add file size to the name of remote sst (#6615)
---
.../org/apache/paimon/mergetree/LookupLevels.java | 16 ++++++++-------
.../mergetree/compact/RemoteLookupFileManager.java | 24 ++++++++++++++++------
.../flink/lookup/LookupRemoteFileTableTest.java | 11 ++++++++--
3 files changed, 36 insertions(+), 15 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
index f90e22cd10..0e13d139e3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
@@ -68,6 +68,7 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
private final Function<Long, BloomFilter.Builder> bfGenerator;
private final Cache<String, LookupFile> lookupFileCache;
private final Set<String> ownCachedFiles;
+ private final String remoteSstSuffix;
@Nullable private RemoteFileDownloader remoteFileDownloader;
@@ -91,6 +92,12 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
this.bfGenerator = bfGenerator;
this.lookupFileCache = lookupFileCache;
this.ownCachedFiles = new HashSet<>();
+ this.remoteSstSuffix =
+ "."
+ + persistProcessor.identifier()
+ + "."
+ + CURRENT_VERSION
+ + REMOTE_LOOKUP_FILE_SUFFIX;
levels.addDropFileCallback(this);
}
@@ -214,13 +221,8 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
}
}
- public String remoteSstName(String dataFileName) {
- return dataFileName
- + "."
- + persistProcessor.identifier()
- + "."
- + CURRENT_VERSION
- + REMOTE_LOOKUP_FILE_SUFFIX;
+ public String remoteSstSuffix() {
+ return remoteSstSuffix;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
index 0751d99cd8..4303e2374c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/RemoteLookupFileManager.java
@@ -41,6 +41,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/** Manager to manage remote files for lookup. */
public class RemoteLookupFileManager<T> implements RemoteFileDownloader {
@@ -75,14 +76,15 @@ public class RemoteLookupFileManager<T> implements
RemoteFileDownloader {
return file;
}
- String remoteSstName = lookupLevels.remoteSstName(file.fileName());
- if (file.extraFiles().contains(remoteSstName)) {
+ if (remoteSst(file).isPresent()) {
// ignore existed
return file;
}
- Path sstFile = remoteSstPath(file, remoteSstName);
LookupFile lookupFile = lookupLevels.createLookupFile(file);
+ long length = lookupFile.localFile().length();
+ String remoteSstName = newRemoteSstName(file, length);
+ Path sstFile = remoteSstPath(file, remoteSstName);
try (FileInputStream is = new FileInputStream(lookupFile.localFile());
PositionOutputStream os = fileIO.newOutputStream(sstFile,
false)) {
IOUtils.copy(is, os);
@@ -107,9 +109,9 @@ public class RemoteLookupFileManager<T> implements
RemoteFileDownloader {
}
}
- String remoteSstName = lookupLevels.remoteSstName(dataFile.fileName());
- if (dataFile.extraFiles().contains(remoteSstName)) {
- Path remoteSstPath = remoteSstPath(dataFile, remoteSstName);
+ Optional<String> remoteSst = remoteSst(dataFile);
+ if (remoteSst.isPresent()) {
+ Path remoteSstPath = remoteSstPath(dataFile, remoteSst.get());
try (SeekableInputStream is = fileIO.newInputStream(remoteSstPath);
FileOutputStream os = new FileOutputStream(localFile)) {
IOUtils.copy(is, os);
@@ -121,6 +123,16 @@ public class RemoteLookupFileManager<T> implements
RemoteFileDownloader {
return false;
}
+ private Optional<String> remoteSst(DataFileMeta file) {
+ return file.extraFiles().stream()
+ .filter(f -> f.endsWith(lookupLevels.remoteSstSuffix()))
+ .findFirst();
+ }
+
+ private String newRemoteSstName(DataFileMeta file, long length) {
+ return file.fileName() + "." + length + lookupLevels.remoteSstSuffix();
+ }
+
private Path remoteSstPath(DataFileMeta file, String remoteSstName) {
return new Path(pathFactory.toPath(file).getParent(), remoteSstName);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupRemoteFileTableTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupRemoteFileTableTest.java
index 61f3b65a85..24c3195292 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupRemoteFileTableTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupRemoteFileTableTest.java
@@ -96,6 +96,7 @@ public class LookupRemoteFileTableTest extends TableTestBase {
catalog.createTable(identifier, schema, false);
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ LocalFileIO fileIO = LocalFileIO.create();
// first write
try (BatchTableWrite write =
writeBuilder.newWrite().withIOManager(ioManager);
@@ -122,12 +123,18 @@ public class LookupRemoteFileTableTest extends
TableTestBase {
DataSplit firstSplit = (DataSplit) splits.get(0);
DataFileMeta firstFile = firstSplit.dataFiles().get(0);
List<String> extraFiles = firstFile.extraFiles();
- assertThat(extraFiles.get(0)).endsWith(".position.v1.lookup");
+ String extraFile = extraFiles.get(0);
+ //
data-410685c7-4cc2-47d7-9dec-393f6cfe9d64-0.parquet.115.position.v1.lookup
+ assertThat(extraFile).endsWith(".position.v1.lookup");
+ long lookupFileSize =
+ fileIO.getFileSize(
+ new Path(new Path(tempPath.toUri()),
"default.db/t/bucket-0/" + extraFile));
+ String[] split = extraFile.split("\\.");
+ assertThat(split[split.length -
4]).isEqualTo(String.valueOf(lookupFileSize));
// third write with lookup but no data file
// delete file first
- LocalFileIO fileIO = LocalFileIO.create();
Path firstPath =
table.store()
.pathFactory()