This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e34762dfad [core] Refactor footer of lookup sst file to 
SortLookupStoreFooter
e34762dfad is described below

commit e34762dfad5a80f5054c75205962963def89768c
Author: JingsongLi <[email protected]>
AuthorDate: Thu Dec 25 21:59:17 2025 +0800

    [core] Refactor footer of lookup sst file to SortLookupStoreFooter
---
 .../paimon/lookup/sort/SortLookupStoreFactory.java | 12 ++++-
 .../sort/SortLookupStoreFooter.java}               | 19 +++----
 .../paimon/lookup/sort/SortLookupStoreReader.java  | 32 ++++++++---
 .../paimon/lookup/sort/SortLookupStoreWriter.java  | 43 +++++++++++----
 .../java/org/apache/paimon/sst/BlockHandle.java    |  3 +-
 .../org/apache/paimon/sst/BloomFilterHandle.java   |  2 +-
 .../java/org/apache/paimon/sst/SstFileReader.java  | 41 ++++----------
 .../java/org/apache/paimon/sst/SstFileWriter.java  | 62 ++++++++++++----------
 .../apache/paimon/utils/FileBasedBloomFilter.java  | 21 ++++++++
 .../sort/SortLookupStoreTest.java}                 | 55 +++++++++----------
 .../org/apache/paimon/sst/BlockIteratorTest.java   |  1 +
 11 files changed, 173 insertions(+), 118 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
index cbf01d590e..546a68e18f 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
@@ -20,6 +20,10 @@ package org.apache.paimon.lookup.sort;
 
 import org.apache.paimon.compression.BlockCompressionFactory;
 import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.lookup.LookupStoreFactory;
 import org.apache.paimon.memory.MemorySlice;
@@ -52,12 +56,16 @@ public class SortLookupStoreFactory implements 
LookupStoreFactory {
 
     @Override
     public SortLookupStoreReader createReader(File file) throws IOException {
-        return new SortLookupStoreReader(comparator, file, cacheManager);
+        Path filePath = new Path(file.getAbsolutePath());
+        SeekableInputStream input = 
LocalFileIO.INSTANCE.newInputStream(filePath);
+        return new SortLookupStoreReader(comparator, filePath, file.length(), 
input, cacheManager);
     }
 
     @Override
     public SortLookupStoreWriter createWriter(File file, @Nullable 
BloomFilter.Builder bloomFilter)
             throws IOException {
-        return new SortLookupStoreWriter(file, blockSize, bloomFilter, 
compressionFactory);
+        Path filePath = new Path(file.getAbsolutePath());
+        PositionOutputStream out = 
LocalFileIO.INSTANCE.newOutputStream(filePath, true);
+        return new SortLookupStoreWriter(out, blockSize, bloomFilter, 
compressionFactory);
     }
 }
diff --git a/paimon-common/src/main/java/org/apache/paimon/sst/Footer.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
similarity index 83%
rename from paimon-common/src/main/java/org/apache/paimon/sst/Footer.java
rename to 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
index 7855d18a94..9f4272096c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/Footer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
@@ -16,28 +16,29 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.sst;
+package org.apache.paimon.lookup.sort;
 
 import org.apache.paimon.memory.MemorySlice;
 import org.apache.paimon.memory.MemorySliceInput;
 import org.apache.paimon.memory.MemorySliceOutput;
+import org.apache.paimon.sst.BlockHandle;
+import org.apache.paimon.sst.BloomFilterHandle;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
-
 import static org.apache.paimon.sst.SstFileWriter.MAGIC_NUMBER;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Footer for a sorted file. */
-public class Footer {
+public class SortLookupStoreFooter {
 
     public static final int ENCODED_LENGTH = 36;
 
     @Nullable private final BloomFilterHandle bloomFilterHandle;
     private final BlockHandle indexBlockHandle;
 
-    Footer(@Nullable BloomFilterHandle bloomFilterHandle, BlockHandle 
indexBlockHandle) {
+    public SortLookupStoreFooter(
+            @Nullable BloomFilterHandle bloomFilterHandle, BlockHandle 
indexBlockHandle) {
         this.bloomFilterHandle = bloomFilterHandle;
         this.indexBlockHandle = indexBlockHandle;
     }
@@ -51,7 +52,7 @@ public class Footer {
         return indexBlockHandle;
     }
 
-    public static Footer readFooter(MemorySliceInput sliceInput) throws 
IOException {
+    public static SortLookupStoreFooter readFooter(MemorySliceInput 
sliceInput) {
         // read bloom filter and index handles
         @Nullable
         BloomFilterHandle bloomFilterHandle =
@@ -71,16 +72,16 @@ public class Footer {
         int magicNumber = sliceInput.readInt();
         checkArgument(magicNumber == MAGIC_NUMBER, "File is not a table (bad 
magic number)");
 
-        return new Footer(bloomFilterHandle, indexBlockHandle);
+        return new SortLookupStoreFooter(bloomFilterHandle, indexBlockHandle);
     }
 
-    public static MemorySlice writeFooter(Footer footer) {
+    public static MemorySlice writeFooter(SortLookupStoreFooter footer) {
         MemorySliceOutput output = new MemorySliceOutput(ENCODED_LENGTH);
         writeFooter(footer, output);
         return output.toSlice();
     }
 
-    public static void writeFooter(Footer footer, MemorySliceOutput 
sliceOutput) {
+    public static void writeFooter(SortLookupStoreFooter footer, 
MemorySliceOutput sliceOutput) {
         // write bloom filter and index handles
         if (footer.bloomFilterHandle == null) {
             sliceOutput.writeLong(0);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
index ef862e6a3a..63b060fee9 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
@@ -20,15 +20,16 @@ package org.apache.paimon.lookup.sort;
 
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.lookup.LookupStoreReader;
+import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.sst.BlockCache;
 import org.apache.paimon.sst.SstFileReader;
+import org.apache.paimon.utils.FileBasedBloomFilter;
 
 import javax.annotation.Nullable;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Comparator;
 
@@ -39,11 +40,24 @@ public class SortLookupStoreReader implements 
LookupStoreReader {
     private final SstFileReader reader;
 
     public SortLookupStoreReader(
-            Comparator<MemorySlice> comparator, File file, CacheManager 
cacheManager)
-            throws IOException {
-        Path filePath = new Path(file.getAbsolutePath());
-        this.input = LocalFileIO.INSTANCE.newInputStream(filePath);
-        this.reader = new SstFileReader(comparator, file.length(), filePath, 
input, cacheManager);
+            Comparator<MemorySlice> comparator,
+            Path filePath,
+            long fileLen,
+            SeekableInputStream input,
+            CacheManager cacheManager) {
+        this.input = input;
+        BlockCache blockCache = new BlockCache(filePath, input, cacheManager);
+        int footerLen = SortLookupStoreFooter.ENCODED_LENGTH;
+        MemorySegment footerData =
+                blockCache.getBlock(fileLen - footerLen, footerLen, b -> b, 
true);
+        SortLookupStoreFooter footer =
+                
SortLookupStoreFooter.readFooter(MemorySlice.wrap(footerData).toInput());
+        FileBasedBloomFilter bloomFilter =
+                FileBasedBloomFilter.create(
+                        input, filePath, cacheManager, 
footer.getBloomFilterHandle());
+        this.reader =
+                new SstFileReader(
+                        comparator, blockCache, footer.getIndexBlockHandle(), 
bloomFilter);
     }
 
     @Nullable
@@ -52,6 +66,10 @@ public class SortLookupStoreReader implements 
LookupStoreReader {
         return reader.lookup(key);
     }
 
+    public SstFileReader.SstFileIterator createIterator() {
+        return reader.createIterator();
+    }
+
     @Override
     public void close() throws IOException {
         reader.close();
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
index e8ed55b96b..e5fab5fc5c 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
@@ -19,32 +19,49 @@
 package org.apache.paimon.lookup.sort;
 
 import org.apache.paimon.compression.BlockCompressionFactory;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
-import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.lookup.LookupStoreWriter;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.sst.BlockHandle;
+import org.apache.paimon.sst.BloomFilterHandle;
 import org.apache.paimon.sst.SstFileWriter;
 import org.apache.paimon.utils.BloomFilter;
 
 import javax.annotation.Nullable;
 
-import java.io.File;
 import java.io.IOException;
 
-/** A {@link LookupStoreWriter} backed by an {@link SstFileWriter}. */
+/**
+ * A {@link LookupStoreWriter} backed by an {@link SstFileWriter}. The SST 
File layout is as below:
+ * (For layouts of each block type, please refer to corresponding classes)
+ *
+ * <pre>
+ *     +-----------------------------------+------+
+ *     |             Footer                |      |
+ *     +-----------------------------------+      |
+ *     |           Index Block             |      +--> Loaded on open
+ *     +-----------------------------------+      |
+ *     |        Bloom Filter Block         |      |
+ *     +-----------------------------------+------+
+ *     |            Data Block             |      |
+ *     +-----------------------------------+      |
+ *     |              ......               |      +--> Loaded on requested
+ *     +-----------------------------------+      |
+ *     |            Data Block             |      |
+ *     +-----------------------------------+------+
+ * </pre>
+ */
 public class SortLookupStoreWriter implements LookupStoreWriter {
 
     private final SstFileWriter writer;
     private final PositionOutputStream out;
 
     public SortLookupStoreWriter(
-            File file,
+            PositionOutputStream out,
             int blockSize,
             @Nullable BloomFilter.Builder bloomFilter,
-            BlockCompressionFactory compressionFactory)
-            throws IOException {
-        Path filePath = new Path(file.getAbsolutePath());
-        this.out = LocalFileIO.INSTANCE.newOutputStream(filePath, true);
+            BlockCompressionFactory compressionFactory) {
+        this.out = out;
         this.writer = new SstFileWriter(out, blockSize, bloomFilter, 
compressionFactory);
     }
 
@@ -55,7 +72,13 @@ public class SortLookupStoreWriter implements 
LookupStoreWriter {
 
     @Override
     public void close() throws IOException {
-        writer.close();
+        writer.flush();
+        BloomFilterHandle bloomFilterHandle = writer.writeBloomFilter();
+        BlockHandle indexBlockHandle = writer.writeIndexBlock();
+        SortLookupStoreFooter footer =
+                new SortLookupStoreFooter(bloomFilterHandle, indexBlockHandle);
+        MemorySlice footerEncoding = SortLookupStoreFooter.writeFooter(footer);
+        writer.writeSlice(footerEncoding);
         out.close();
     }
 }
diff --git a/paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java
index 60d4c5929c..0ced616cf3 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java
@@ -24,12 +24,13 @@ import org.apache.paimon.memory.MemorySliceOutput;
 
 /** Handle for a block. */
 public class BlockHandle {
+
     public static final int MAX_ENCODED_LENGTH = 9 + 5;
 
     private final long offset;
     private final int size;
 
-    BlockHandle(long offset, int size) {
+    public BlockHandle(long offset, int size) {
         this.offset = offset;
         this.size = size;
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
index dbcb962481..05e9f684bc 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
@@ -27,7 +27,7 @@ public class BloomFilterHandle {
     private final int size;
     private final long expectedEntries;
 
-    BloomFilterHandle(long offset, int size, long expectedEntries) {
+    public BloomFilterHandle(long offset, int size, long expectedEntries) {
         this.offset = offset;
         this.size = size;
         this.expectedEntries = expectedEntries;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
index 26148d0184..788e926dc7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
@@ -20,9 +20,6 @@ package org.apache.paimon.sst;
 
 import org.apache.paimon.compression.BlockCompressionFactory;
 import org.apache.paimon.compression.BlockDecompressor;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.memory.MemorySlice;
 import org.apache.paimon.memory.MemorySliceInput;
@@ -48,39 +45,19 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 public class SstFileReader implements Closeable {
 
     private final Comparator<MemorySlice> comparator;
-    private final Path filePath;
     private final BlockCache blockCache;
     private final BlockReader indexBlock;
     @Nullable private final FileBasedBloomFilter bloomFilter;
 
     public SstFileReader(
             Comparator<MemorySlice> comparator,
-            long fileSize,
-            Path filePath,
-            SeekableInputStream input,
-            CacheManager cacheManager)
-            throws IOException {
+            BlockCache blockCache,
+            BlockHandle indexBlockHandle,
+            @Nullable FileBasedBloomFilter bloomFilter) {
         this.comparator = comparator;
-        this.filePath = filePath;
-        this.blockCache = new BlockCache(filePath, input, cacheManager);
-        MemorySegment footerData =
-                blockCache.getBlock(
-                        fileSize - Footer.ENCODED_LENGTH, 
Footer.ENCODED_LENGTH, b -> b, true);
-        Footer footer = 
Footer.readFooter(MemorySlice.wrap(footerData).toInput());
-        this.indexBlock = readBlock(footer.getIndexBlockHandle(), true);
-        BloomFilterHandle handle = footer.getBloomFilterHandle();
-        if (handle == null) {
-            this.bloomFilter = null;
-        } else {
-            this.bloomFilter =
-                    new FileBasedBloomFilter(
-                            input,
-                            filePath,
-                            cacheManager,
-                            handle.expectedEntries(),
-                            handle.offset(),
-                            handle.size());
-        }
+        this.blockCache = blockCache;
+        this.indexBlock = readBlock(indexBlockHandle, true);
+        this.bloomFilter = bloomFilter;
     }
 
     /**
@@ -154,8 +131,8 @@ public class SstFileReader implements Closeable {
         checkArgument(
                 blockTrailer.getCrc32c() == crc32cCode,
                 String.format(
-                        "Expected CRC32C(%d) but found CRC32C(%d) for 
file(%s)",
-                        blockTrailer.getCrc32c(), crc32cCode, filePath));
+                        "Expected CRC32C(%d) but found CRC32C(%d)",
+                        blockTrailer.getCrc32c(), crc32cCode));
 
         // decompress data
         BlockCompressionFactory compressionFactory =
@@ -184,11 +161,11 @@ public class SstFileReader implements Closeable {
             bloomFilter.close();
         }
         blockCache.close();
-        // do not need to close input, since it will be closed by outer classes
     }
 
     /** An Iterator for range queries. */
     public class SstFileIterator {
+
         private final BlockIterator indexIterator;
         private @Nullable BlockIterator seekedDataBlock = null;
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
index 603899378b..1a8e2d8bda 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
@@ -22,6 +22,7 @@ import org.apache.paimon.compression.BlockCompressionFactory;
 import org.apache.paimon.compression.BlockCompressionType;
 import org.apache.paimon.compression.BlockCompressor;
 import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.lookup.sort.SortLookupStoreFooter;
 import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.memory.MemorySlice;
 import org.apache.paimon.options.MemorySize;
@@ -33,7 +34,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.io.Closeable;
 import java.io.IOException;
 
 import static org.apache.paimon.memory.MemorySegmentUtils.allocateReuseBytes;
@@ -43,26 +43,9 @@ import static 
org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
 
 /**
  * The writer for writing SST Files. SST Files are row-oriented and designed 
to serve frequent point
- * queries and range queries by key. The SST File layout is as below: (For 
layouts of each block
- * type, please refer to corresponding classes)
- *
- * <pre>
- *     +-----------------------------------+------+
- *     |             Footer                |      |
- *     +-----------------------------------+      |
- *     |           Index Block             |      +--> Loaded on open
- *     +-----------------------------------+      |
- *     |        Bloom Filter Block         |      |
- *     +-----------------------------------+------+
- *     |            Data Block             |      |
- *     +-----------------------------------+      |
- *     |              ......               |      +--> Loaded on requested
- *     +-----------------------------------+      |
- *     |            Data Block             |      |
- *     +-----------------------------------+------+
- * </pre>
+ * queries and range queries by key.
  */
-public class SstFileWriter implements Closeable {
+public class SstFileWriter {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SstFileWriter.class.getName());
 
@@ -77,7 +60,6 @@ public class SstFileWriter implements Closeable {
     @Nullable private final BlockCompressor blockCompressor;
 
     private byte[] lastKey;
-    private long position;
 
     private long recordCount;
     private long totalUncompressedSize;
@@ -127,7 +109,7 @@ public class SstFileWriter implements Closeable {
         recordCount++;
     }
 
-    private void flush() throws IOException {
+    public void flush() throws IOException {
         if (dataBlockWriter.size() == 0) {
             return;
         }
@@ -173,7 +155,7 @@ public class SstFileWriter implements Closeable {
         MemorySlice trailer = BlockTrailer.writeBlockTrailer(blockTrailer);
 
         // create a handle to this block
-        BlockHandle blockHandle = new BlockHandle(position, block.length());
+        BlockHandle blockHandle = new BlockHandle(out.getPos(), 
block.length());
 
         // write data
         writeSlice(block);
@@ -187,7 +169,28 @@ public class SstFileWriter implements Closeable {
         return blockHandle;
     }
 
-    @Override
+    @Nullable
+    public BloomFilterHandle writeBloomFilter() throws IOException {
+        if (bloomFilter == null) {
+            return null;
+        }
+        MemorySegment buffer = bloomFilter.getBuffer();
+        BloomFilterHandle bloomFilterHandle =
+                new BloomFilterHandle(out.getPos(), buffer.size(), 
bloomFilter.expectedEntries());
+        writeSlice(MemorySlice.wrap(buffer));
+        LOG.info("Bloom filter size: {} bytes", 
bloomFilter.getBuffer().size());
+        return bloomFilterHandle;
+    }
+
+    @Nullable
+    public BlockHandle writeIndexBlock() throws IOException {
+        BlockHandle indexBlock = writeBlock(indexBlockWriter);
+        LOG.info("Number of record: {}", recordCount);
+        LOG.info("totalUncompressedSize: {}", 
MemorySize.ofBytes(totalUncompressedSize));
+        LOG.info("totalCompressedSize: {}", 
MemorySize.ofBytes(totalCompressedSize));
+        return indexBlock;
+    }
+
     public void close() throws IOException {
         // flush current data block
         flush();
@@ -199,7 +202,8 @@ public class SstFileWriter implements Closeable {
         if (bloomFilter != null) {
             MemorySegment buffer = bloomFilter.getBuffer();
             bloomFilterHandle =
-                    new BloomFilterHandle(position, buffer.size(), 
bloomFilter.expectedEntries());
+                    new BloomFilterHandle(
+                            out.getPos(), buffer.size(), 
bloomFilter.expectedEntries());
             writeSlice(MemorySlice.wrap(buffer));
             LOG.info("Bloom filter size: {} bytes", 
bloomFilter.getBuffer().size());
         }
@@ -208,8 +212,9 @@ public class SstFileWriter implements Closeable {
         BlockHandle indexBlockHandle = writeBlock(indexBlockWriter);
 
         // write footer
-        Footer footer = new Footer(bloomFilterHandle, indexBlockHandle);
-        MemorySlice footerEncoding = Footer.writeFooter(footer);
+        SortLookupStoreFooter footer =
+                new SortLookupStoreFooter(bloomFilterHandle, indexBlockHandle);
+        MemorySlice footerEncoding = SortLookupStoreFooter.writeFooter(footer);
         writeSlice(footerEncoding);
 
         // do not need to close outputStream, since it will be closed by outer 
classes
@@ -218,8 +223,7 @@ public class SstFileWriter implements Closeable {
         LOG.info("totalCompressedSize: {}", 
MemorySize.ofBytes(totalCompressedSize));
     }
 
-    private void writeSlice(MemorySlice slice) throws IOException {
+    public void writeSlice(MemorySlice slice) throws IOException {
         out.write(slice.getHeapMemory(), slice.offset(), slice.length());
-        position += slice.length();
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
index f05bc6bcca..d6feca60ac 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
@@ -26,6 +26,9 @@ import org.apache.paimon.io.cache.CacheKey;
 import org.apache.paimon.io.cache.CacheKey.PositionCacheKey;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.sst.BloomFilterHandle;
+
+import javax.annotation.Nullable;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -58,6 +61,24 @@ public class FileBasedBloomFilter implements Closeable {
         this.cacheKey = CacheKey.forPosition(filePath, readOffset, readLength, 
true);
     }
 
+    @Nullable
+    public static FileBasedBloomFilter create(
+            SeekableInputStream input,
+            Path filePath,
+            CacheManager cacheManager,
+            @Nullable BloomFilterHandle bloomFilterHandle) {
+        if (bloomFilterHandle == null) {
+            return null;
+        }
+        return new FileBasedBloomFilter(
+                input,
+                filePath,
+                cacheManager,
+                bloomFilterHandle.expectedEntries(),
+                bloomFilterHandle.offset(),
+                bloomFilterHandle.size());
+    }
+
     public boolean testHash(int hash) {
         accessCount++;
         // we should refresh cache in LRU, but we cannot refresh everytime, it 
is costly.
diff --git a/paimon-common/src/test/java/org/apache/paimon/sst/SstFileTest.java 
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreTest.java
similarity index 87%
rename from paimon-common/src/test/java/org/apache/paimon/sst/SstFileTest.java
rename to 
paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreTest.java
index 7e5642d333..235954e33d 100644
--- a/paimon-common/src/test/java/org/apache/paimon/sst/SstFileTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.sst;
+package org.apache.paimon.lookup.sort;
 
 import org.apache.paimon.compression.BlockCompressionFactory;
 import org.apache.paimon.compression.CompressOptions;
@@ -29,6 +29,10 @@ import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.memory.MemorySlice;
 import org.apache.paimon.memory.MemorySliceOutput;
 import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.sst.BlockEntry;
+import org.apache.paimon.sst.BlockIterator;
+import org.apache.paimon.sst.SstFileReader;
+import org.apache.paimon.sst.SstFileWriter;
 import 
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
 import org.apache.paimon.testutils.junit.parameterized.Parameters;
 import org.apache.paimon.utils.BloomFilter;
@@ -41,6 +45,7 @@ import org.junit.jupiter.api.io.TempDir;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -50,8 +55,9 @@ import java.util.UUID;
 
 /** Test for {@link SstFileReader} and {@link SstFileWriter}. */
 @ExtendWith(ParameterizedTestExtension.class)
-public class SstFileTest {
-    private static final Logger LOG = 
LoggerFactory.getLogger(SstFileTest.class);
+public class SortLookupStoreTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SortLookupStoreTest.class);
 
     // 256 records per block
     private static final int BLOCK_SIZE = (10) * 256;
@@ -63,9 +69,9 @@ public class SstFileTest {
 
     private FileIO fileIO;
     private Path file;
-    private Path parent;
+    private File localFile;
 
-    public SstFileTest(List<Object> var) {
+    public SortLookupStoreTest(List<Object> var) {
         this.bloomFilterEnabled = (Boolean) var.get(0);
         this.compress = new CompressOptions((String) var.get(1), 1);
     }
@@ -85,8 +91,8 @@ public class SstFileTest {
     @BeforeEach
     public void beforeEach() {
         this.fileIO = LocalFileIO.create();
-        this.parent = new Path(tempPath.toUri());
         this.file = new Path(new Path(tempPath.toUri()), 
UUID.randomUUID().toString());
+        this.localFile = new File(file.toUri().getPath());
     }
 
     @TestTemplate
@@ -95,16 +101,18 @@ public class SstFileTest {
         innerTestLookup();
     }
 
+    private SortLookupStoreReader newReader() throws IOException {
+        SeekableInputStream input = LocalFileIO.INSTANCE.newInputStream(file);
+        return new SortLookupStoreReader(
+                Comparator.comparingInt(slice -> slice.readInt(0)),
+                file,
+                localFile.length(),
+                input,
+                CACHE_MANAGER);
+    }
+
     private void innerTestLookup() throws Exception {
-        long fileSize = fileIO.getFileSize(file);
-        try (SeekableInputStream inputStream = fileIO.newInputStream(file);
-                SstFileReader reader =
-                        new SstFileReader(
-                                Comparator.comparingInt(slice -> 
slice.readInt(0)),
-                                fileSize,
-                                file,
-                                inputStream,
-                                CACHE_MANAGER); ) {
+        try (SortLookupStoreReader reader = newReader()) {
             Random random = new Random();
             MemorySliceOutput keyOut = new MemorySliceOutput(4);
 
@@ -162,15 +170,7 @@ public class SstFileTest {
         int recordNum = 20000;
         writeData(recordNum, bloomFilterEnabled);
 
-        long fileSize = fileIO.getFileSize(file);
-        try (SeekableInputStream inputStream = fileIO.newInputStream(file);
-                SstFileReader reader =
-                        new SstFileReader(
-                                Comparator.comparingInt(slice -> 
slice.readInt(0)),
-                                fileSize,
-                                file,
-                                inputStream,
-                                CACHE_MANAGER); ) {
+        try (SortLookupStoreReader reader = newReader()) {
             SstFileReader.SstFileIterator fileIterator = 
reader.createIterator();
 
             MemorySliceOutput keyOut = new MemorySliceOutput(4);
@@ -230,7 +230,8 @@ public class SstFileTest {
         iterator.seekTo(keyOut.toSlice().getHeapMemory());
     }
 
-    private void interleaveLookup(SstFileReader reader, MemorySliceOutput 
keyOut) throws Exception {
+    private void interleaveLookup(SortLookupStoreReader reader, 
MemorySliceOutput keyOut)
+            throws Exception {
         keyOut.reset();
         keyOut.writeInt(0);
         reader.lookup(keyOut.toSlice().getHeapMemory());
@@ -243,8 +244,8 @@ public class SstFileTest {
         }
         BlockCompressionFactory compressionFactory = 
BlockCompressionFactory.create(compress);
         try (PositionOutputStream outputStream = fileIO.newOutputStream(file, 
true);
-                SstFileWriter writer =
-                        new SstFileWriter(
+                SortLookupStoreWriter writer =
+                        new SortLookupStoreWriter(
                                 outputStream, BLOCK_SIZE, bloomFilter, 
compressionFactory); ) {
             MemorySliceOutput keyOut = new MemorySliceOutput(4);
             MemorySliceOutput valueOut = new MemorySliceOutput(4);
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java 
b/paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
index 06bfa71690..f17c436518 100644
--- a/paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
@@ -30,6 +30,7 @@ import java.util.Map;
 
 /** Test for {@link BlockIterator}. */
 public class BlockIteratorTest {
+
     private static final int ROW_NUM = 10_000;
     private static final Comparator<MemorySlice> COMPARATOR =
             Comparator.comparingInt(slice -> slice.readInt(0));

Reply via email to