This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 432fad719e [core] Extract an SST File Format from LookupStore. (#6755)
432fad719e is described below

commit 432fad719ec93bcc902a16e3b31f374be473d339
Author: Faiz <[email protected]>
AuthorDate: Sat Dec 6 20:04:14 2025 +0800

    [core] Extract an SST File Format from LookupStore. (#6755)
---
 .../java/org/apache/paimon/io/cache/CacheKey.java  |  19 +--
 .../paimon/lookup/sort/SortLookupStoreFactory.java |   2 +-
 .../paimon/lookup/sort/SortLookupStoreReader.java  | 150 +++--------------
 .../paimon/lookup/sort/SortLookupStoreWriter.java  | 177 +++-----------------
 .../{lookup/sort => sst}/BlockAlignedType.java     |   2 +-
 .../paimon/{lookup/sort => sst}/BlockCache.java    |  27 ++--
 .../paimon/{lookup/sort => sst}/BlockEntry.java    |   2 +-
 .../paimon/{lookup/sort => sst}/BlockHandle.java   |   2 +-
 .../paimon/{lookup/sort => sst}/BlockIterator.java |   2 +-
 .../paimon/{lookup/sort => sst}/BlockReader.java   |   4 +-
 .../paimon/{lookup/sort => sst}/BlockTrailer.java  |   2 +-
 .../paimon/{lookup/sort => sst}/BlockWriter.java   |  32 +++-
 .../{lookup/sort => sst}/BloomFilterHandle.java    |   2 +-
 .../apache/paimon/{lookup/sort => sst}/Footer.java |   4 +-
 .../paimon/{lookup/sort => sst}/SortContext.java   |   2 +-
 .../SstFileReader.java}                            |  48 +++---
 .../SstFileUtils.java}                             |   4 +-
 .../SstFileWriter.java}                            |  61 ++++---
 .../apache/paimon/utils/FileBasedBloomFilter.java  |  25 +--
 .../{lookup/sort => sst}/BlockIteratorTest.java    |   2 +-
 .../java/org/apache/paimon/sst/SstFileTest.java    | 178 +++++++++++++++++++++
 .../paimon/utils/FileBasedBloomFilterTest.java     |  13 +-
 22 files changed, 376 insertions(+), 384 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java 
b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
index 11b8beb22c..74bca4c593 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
@@ -18,14 +18,16 @@
 
 package org.apache.paimon.io.cache;
 
+import org.apache.paimon.fs.Path;
+
 import java.io.RandomAccessFile;
 import java.util.Objects;
 
 /** Key for cache manager. */
 public interface CacheKey {
 
-    static CacheKey forPosition(RandomAccessFile file, long position, int 
length, boolean isIndex) {
-        return new PositionCacheKey(file, position, length, isIndex);
+    static CacheKey forPosition(Path filePath, long position, int length, 
boolean isIndex) {
+        return new PositionCacheKey(filePath, position, length, isIndex);
     }
 
     static CacheKey forPageIndex(RandomAccessFile file, int pageSize, int 
pageIndex) {
@@ -35,17 +37,16 @@ public interface CacheKey {
     /** @return Whether this cache key is for index cache. */
     boolean isIndex();
 
-    /** Key for file position and length. */
+    /** Key for file position of a file path (could be remote) and length. */
     class PositionCacheKey implements CacheKey {
 
-        private final RandomAccessFile file;
+        private final Path filePath;
         private final long position;
         private final int length;
         private final boolean isIndex;
 
-        private PositionCacheKey(
-                RandomAccessFile file, long position, int length, boolean 
isIndex) {
-            this.file = file;
+        private PositionCacheKey(Path filePath, long position, int length, 
boolean isIndex) {
+            this.filePath = filePath;
             this.position = position;
             this.length = length;
             this.isIndex = isIndex;
@@ -63,12 +64,12 @@ public interface CacheKey {
             return position == that.position
                     && length == that.length
                     && isIndex == that.isIndex
-                    && Objects.equals(file, that.file);
+                    && Objects.equals(filePath, that.filePath);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(file, position, length, isIndex);
+            return Objects.hash(filePath, position, length, isIndex);
         }
 
         @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
index 7dcacc9ad1..cbf01d590e 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
@@ -52,7 +52,7 @@ public class SortLookupStoreFactory implements 
LookupStoreFactory {
 
     @Override
     public SortLookupStoreReader createReader(File file) throws IOException {
-        return new SortLookupStoreReader(comparator, file, blockSize, 
cacheManager);
+        return new SortLookupStoreReader(comparator, file, cacheManager);
     }
 
     @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
index fd068baaa2..684273ae15 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
@@ -18,16 +18,14 @@
 
 package org.apache.paimon.lookup.sort;
 
-import org.apache.paimon.compression.BlockCompressionFactory;
-import org.apache.paimon.compression.BlockDecompressor;
-import org.apache.paimon.io.PageFileInput;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.lookup.LookupStoreReader;
-import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.memory.MemorySlice;
-import org.apache.paimon.memory.MemorySliceInput;
-import org.apache.paimon.utils.FileBasedBloomFilter;
-import org.apache.paimon.utils.MurmurHashUtils;
+import org.apache.paimon.sst.SstFileReader;
 
 import javax.annotation.Nullable;
 
@@ -35,142 +33,34 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Comparator;
 
-import static org.apache.paimon.lookup.sort.SortLookupStoreUtils.crc32c;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
-/** A {@link LookupStoreReader} for sort store. */
+/** A {@link LookupStoreReader} backed by an {@link SstFileReader}. */
 public class SortLookupStoreReader implements LookupStoreReader {
 
-    private final Comparator<MemorySlice> comparator;
-    private final String filePath;
-    private final long fileSize;
-
-    private final BlockIterator indexBlockIterator;
-    @Nullable private FileBasedBloomFilter bloomFilter;
-    private final BlockCache blockCache;
-    private final PageFileInput fileInput;
+    private final FileIO fileIO;
+    private final SeekableInputStream input;
+    private final SstFileReader sstFileReader;
 
     public SortLookupStoreReader(
-            Comparator<MemorySlice> comparator, File file, int blockSize, 
CacheManager cacheManager)
+            Comparator<MemorySlice> comparator, File file, CacheManager 
cacheManager)
             throws IOException {
-        this.comparator = comparator;
-        this.filePath = file.getAbsolutePath();
-        this.fileSize = file.length();
-
-        this.fileInput = PageFileInput.create(file, blockSize, null, fileSize, 
null);
-        this.blockCache = new BlockCache(fileInput.file(), cacheManager);
-        Footer footer = readFooter();
-        this.indexBlockIterator = readBlock(footer.getIndexBlockHandle(), 
true).iterator();
-        BloomFilterHandle handle = footer.getBloomFilterHandle();
-        if (handle != null) {
-            this.bloomFilter =
-                    new FileBasedBloomFilter(
-                            fileInput,
-                            cacheManager,
-                            handle.expectedEntries(),
-                            handle.offset(),
-                            handle.size());
-        }
-    }
-
-    private Footer readFooter() throws IOException {
-        MemorySegment footerData =
-                blockCache.getBlock(
-                        fileSize - Footer.ENCODED_LENGTH, 
Footer.ENCODED_LENGTH, b -> b, true);
-        return Footer.readFooter(MemorySlice.wrap(footerData).toInput());
+        final Path filePath = new Path(file.getAbsolutePath());
+        this.fileIO = LocalFileIO.create();
+        this.input = fileIO.newInputStream(filePath);
+        this.sstFileReader =
+                new SstFileReader(comparator, file.length(), filePath, input, 
cacheManager);
     }
 
     @Nullable
     @Override
     public byte[] lookup(byte[] key) throws IOException {
-        if (bloomFilter != null && 
!bloomFilter.testHash(MurmurHashUtils.hashBytes(key))) {
-            return null;
-        }
-
-        MemorySlice keySlice = MemorySlice.wrap(key);
-        // seek the index to the block containing the key
-        indexBlockIterator.seekTo(keySlice);
-
-        // if indexIterator does not have a next, it means the key does not 
exist in this iterator
-        if (indexBlockIterator.hasNext()) {
-            // seek the current iterator to the key
-            BlockIterator current = getNextBlock();
-            if (current.seekTo(keySlice)) {
-                return current.next().getValue().copyBytes();
-            }
-        }
-        return null;
-    }
-
-    private BlockIterator getNextBlock() {
-        // index block handle, point to the key, value position.
-        MemorySlice blockHandle = indexBlockIterator.next().getValue();
-        BlockReader dataBlock =
-                readBlock(BlockHandle.readBlockHandle(blockHandle.toInput()), 
false);
-        return dataBlock.iterator();
-    }
-
-    /**
-     * @param blockHandle The block handle.
-     * @param index Whether read the block as an index.
-     * @return The reader of the target block.
-     */
-    private BlockReader readBlock(BlockHandle blockHandle, boolean index) {
-        // read block trailer
-        MemorySegment trailerData =
-                blockCache.getBlock(
-                        blockHandle.offset() + blockHandle.size(),
-                        BlockTrailer.ENCODED_LENGTH,
-                        b -> b,
-                        true);
-        BlockTrailer blockTrailer =
-                
BlockTrailer.readBlockTrailer(MemorySlice.wrap(trailerData).toInput());
-
-        MemorySegment unCompressedBlock =
-                blockCache.getBlock(
-                        blockHandle.offset(),
-                        blockHandle.size(),
-                        bytes -> decompressBlock(bytes, blockTrailer),
-                        index);
-        return new BlockReader(MemorySlice.wrap(unCompressedBlock), 
comparator);
-    }
-
-    private byte[] decompressBlock(byte[] compressedBytes, BlockTrailer 
blockTrailer) {
-        MemorySegment compressed = MemorySegment.wrap(compressedBytes);
-        int crc32cCode = crc32c(compressed, blockTrailer.getCompressionType());
-        checkArgument(
-                blockTrailer.getCrc32c() == crc32cCode,
-                String.format(
-                        "Expected CRC32C(%d) but found CRC32C(%d) for 
file(%s)",
-                        blockTrailer.getCrc32c(), crc32cCode, filePath));
-
-        // decompress data
-        BlockCompressionFactory compressionFactory =
-                
BlockCompressionFactory.create(blockTrailer.getCompressionType());
-        if (compressionFactory == null) {
-            return compressedBytes;
-        } else {
-            MemorySliceInput compressedInput = 
MemorySlice.wrap(compressed).toInput();
-            byte[] uncompressed = new byte[compressedInput.readVarLenInt()];
-            BlockDecompressor decompressor = 
compressionFactory.getDecompressor();
-            int uncompressedLength =
-                    decompressor.decompress(
-                            compressed.getHeapMemory(),
-                            compressedInput.position(),
-                            compressedInput.available(),
-                            uncompressed,
-                            0);
-            checkArgument(uncompressedLength == uncompressed.length);
-            return uncompressed;
-        }
+        return sstFileReader.lookup(key);
     }
 
     @Override
     public void close() throws IOException {
-        if (bloomFilter != null) {
-            bloomFilter.close();
-        }
-        blockCache.close();
-        fileInput.close();
+        // be careful about the close order
+        sstFileReader.close();
+        input.close();
+        fileIO.close();
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
index 31f09cfbc6..366e4560ac 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
@@ -19,185 +19,46 @@
 package org.apache.paimon.lookup.sort;
 
 import org.apache.paimon.compression.BlockCompressionFactory;
-import org.apache.paimon.compression.BlockCompressionType;
-import org.apache.paimon.compression.BlockCompressor;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.lookup.LookupStoreWriter;
-import org.apache.paimon.memory.MemorySegment;
-import org.apache.paimon.memory.MemorySlice;
-import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.sst.SstFileWriter;
 import org.apache.paimon.utils.BloomFilter;
-import org.apache.paimon.utils.MurmurHashUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
-
-import static org.apache.paimon.lookup.sort.BlockHandle.writeBlockHandle;
-import static org.apache.paimon.lookup.sort.SortLookupStoreUtils.crc32c;
-import static org.apache.paimon.memory.MemorySegmentUtils.allocateReuseBytes;
-import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
 
-/** A {@link LookupStoreWriter} for sorting. */
+/** A {@link LookupStoreWriter} backed by an {@link SstFileWriter}. */
 public class SortLookupStoreWriter implements LookupStoreWriter {
+    private final SstFileWriter sstFileWriter;
+    private final FileIO fileIO;
+    private final PositionOutputStream out;
 
-    private static final Logger LOG =
-            LoggerFactory.getLogger(SortLookupStoreWriter.class.getName());
-
-    public static final int MAGIC_NUMBER = 1481571681;
-
-    private final BufferedOutputStream fileOutputStream;
-    private final int blockSize;
-    private final BlockWriter dataBlockWriter;
-    private final BlockWriter indexBlockWriter;
-    @Nullable private final BloomFilter.Builder bloomFilter;
-    private final BlockCompressionType compressionType;
-    @Nullable private final BlockCompressor blockCompressor;
-
-    private byte[] lastKey;
-    private long position;
-
-    private long recordCount;
-    private long totalUncompressedSize;
-    private long totalCompressedSize;
-
-    SortLookupStoreWriter(
+    public SortLookupStoreWriter(
             File file,
             int blockSize,
             @Nullable BloomFilter.Builder bloomFilter,
-            @Nullable BlockCompressionFactory compressionFactory)
+            BlockCompressionFactory compressionFactory)
             throws IOException {
-        this.fileOutputStream = new 
BufferedOutputStream(Files.newOutputStream(file.toPath()));
-        this.blockSize = blockSize;
-        this.dataBlockWriter = new BlockWriter((int) (blockSize * 1.1));
-        int expectedNumberOfBlocks = 1024;
-        this.indexBlockWriter =
-                new BlockWriter(BlockHandle.MAX_ENCODED_LENGTH * 
expectedNumberOfBlocks);
-        this.bloomFilter = bloomFilter;
-        if (compressionFactory == null) {
-            this.compressionType = BlockCompressionType.NONE;
-            this.blockCompressor = null;
-        } else {
-            this.compressionType = compressionFactory.getCompressionType();
-            this.blockCompressor = compressionFactory.getCompressor();
-        }
+        final Path filePath = new Path(file.getAbsolutePath());
+        this.fileIO = LocalFileIO.create();
+        this.out = fileIO.newOutputStream(filePath, true);
+        this.sstFileWriter = new SstFileWriter(out, blockSize, bloomFilter, 
compressionFactory);
     }
 
     @Override
     public void put(byte[] key, byte[] value) throws IOException {
-        dataBlockWriter.add(key, value);
-        if (bloomFilter != null) {
-            bloomFilter.addHash(MurmurHashUtils.hashBytes(key));
-        }
-
-        lastKey = key;
-
-        if (dataBlockWriter.memory() > blockSize) {
-            flush();
-        }
-
-        recordCount++;
-    }
-
-    private void flush() throws IOException {
-        if (dataBlockWriter.size() == 0) {
-            return;
-        }
-
-        BlockHandle blockHandle = writeBlock(dataBlockWriter);
-        MemorySlice handleEncoding = writeBlockHandle(blockHandle);
-        indexBlockWriter.add(lastKey, handleEncoding.copyBytes());
-    }
-
-    private BlockHandle writeBlock(BlockWriter blockWriter) throws IOException 
{
-        // close the block
-        MemorySlice block = blockWriter.finish();
-
-        totalUncompressedSize += block.length();
-
-        // attempt to compress the block
-        BlockCompressionType blockCompressionType = BlockCompressionType.NONE;
-        if (blockCompressor != null) {
-            int maxCompressedSize = 
blockCompressor.getMaxCompressedSize(block.length());
-            byte[] compressed = allocateReuseBytes(maxCompressedSize + 5);
-            int offset = encodeInt(compressed, 0, block.length());
-            int compressedSize =
-                    offset
-                            + blockCompressor.compress(
-                                    block.getHeapMemory(),
-                                    block.offset(),
-                                    block.length(),
-                                    compressed,
-                                    offset);
-
-            // Don't use the compressed data if compressed less than 12.5%,
-            if (compressedSize < block.length() - (block.length() / 8)) {
-                block = new MemorySlice(MemorySegment.wrap(compressed), 0, 
compressedSize);
-                blockCompressionType = this.compressionType;
-            }
-        }
-
-        totalCompressedSize += block.length();
-
-        // create block trailer
-        BlockTrailer blockTrailer =
-                new BlockTrailer(blockCompressionType, crc32c(block, 
blockCompressionType));
-        MemorySlice trailer = BlockTrailer.writeBlockTrailer(blockTrailer);
-
-        // create a handle to this block
-        BlockHandle blockHandle = new BlockHandle(position, block.length());
-
-        // write data
-        writeSlice(block);
-
-        // write trailer: 5 bytes
-        writeSlice(trailer);
-
-        // clean up state
-        blockWriter.reset();
-
-        return blockHandle;
+        sstFileWriter.put(key, value);
     }
 
     @Override
     public void close() throws IOException {
-        // flush current data block
-        flush();
-
-        LOG.info("Number of record: {}", recordCount);
-
-        // write bloom filter
-        @Nullable BloomFilterHandle bloomFilterHandle = null;
-        if (bloomFilter != null) {
-            MemorySegment buffer = bloomFilter.getBuffer();
-            bloomFilterHandle =
-                    new BloomFilterHandle(position, buffer.size(), 
bloomFilter.expectedEntries());
-            writeSlice(MemorySlice.wrap(buffer));
-            LOG.info("Bloom filter size: {} bytes", 
bloomFilter.getBuffer().size());
-        }
-
-        // write index block
-        BlockHandle indexBlockHandle = writeBlock(indexBlockWriter);
-
-        // write footer
-        Footer footer = new Footer(bloomFilterHandle, indexBlockHandle);
-        MemorySlice footerEncoding = Footer.writeFooter(footer);
-        writeSlice(footerEncoding);
-
-        // close file
-        fileOutputStream.close();
-
-        LOG.info("totalUncompressedSize: {}", 
MemorySize.ofBytes(totalUncompressedSize));
-        LOG.info("totalCompressedSize: {}", 
MemorySize.ofBytes(totalCompressedSize));
-    }
-
-    private void writeSlice(MemorySlice slice) throws IOException {
-        fileOutputStream.write(slice.getHeapMemory(), slice.offset(), 
slice.length());
-        position += slice.length();
+        sstFileWriter.close();
+        out.close();
+        fileIO.close();
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockAlignedType.java
 b/paimon-common/src/main/java/org/apache/paimon/sst/BlockAlignedType.java
similarity index 97%
rename from 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockAlignedType.java
rename to 
paimon-common/src/main/java/org/apache/paimon/sst/BlockAlignedType.java
index e5849d9f75..43387c9d63 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockAlignedType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockAlignedType.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
 
 /** Aligned type for block. */
 public enum BlockAlignedType {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java
similarity index 80%
rename from 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java
index 0441a24f22..fbeeba2e77 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java
@@ -16,18 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
 
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.io.cache.CacheKey;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.io.cache.CacheManager.SegmentContainer;
 import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.utils.IOUtils;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -37,32 +37,29 @@ import java.util.function.Function;
 /** Cache for block reading. */
 public class BlockCache implements Closeable {
 
-    private final RandomAccessFile file;
-    private final FileChannel channel;
+    private final Path filePath;
+    private final SeekableInputStream input;
     private final CacheManager cacheManager;
     private final Map<CacheKey, SegmentContainer> blocks;
 
-    public BlockCache(RandomAccessFile file, CacheManager cacheManager) {
-        this.file = file;
-        this.channel = this.file.getChannel();
+    public BlockCache(Path filePath, SeekableInputStream input, CacheManager 
cacheManager) {
+        this.filePath = filePath;
+        this.input = input;
         this.cacheManager = cacheManager;
         this.blocks = new HashMap<>();
     }
 
     private byte[] readFrom(long offset, int length) throws IOException {
         byte[] buffer = new byte[length];
-        int read = channel.read(ByteBuffer.wrap(buffer), offset);
-
-        if (read != length) {
-            throw new IOException("Could not read all the data");
-        }
+        input.seek(offset);
+        IOUtils.readFully(input, buffer);
         return buffer;
     }
 
     public MemorySegment getBlock(
             long position, int length, Function<byte[], byte[]> 
decompressFunc, boolean isIndex) {
 
-        CacheKey cacheKey = CacheKey.forPosition(file, position, length, 
isIndex);
+        CacheKey cacheKey = CacheKey.forPosition(filePath, position, length, 
isIndex);
 
         SegmentContainer container = blocks.get(cacheKey);
         if (container == null || container.getAccessCount() == 
CacheManager.REFRESH_COUNT) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockEntry.java
similarity index 98%
rename from 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockEntry.java
index 4473886e31..efb82e9fb6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockEntry.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
 
 import org.apache.paimon.memory.MemorySlice;
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockHandle.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java
similarity index 98%
rename from 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockHandle.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java
index 737f57a8b0..60d4c5929c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockHandle.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
 
 import org.apache.paimon.memory.MemorySlice;
 import org.apache.paimon.memory.MemorySliceInput;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockIterator.java
similarity index 98%
rename from 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockIterator.java
index 0ca04e91d7..65913957e3 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockIterator.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
 
 import org.apache.paimon.memory.MemorySlice;
 import org.apache.paimon.memory.MemorySliceInput;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockReader.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockReader.java
similarity index 96%
rename from 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockReader.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockReader.java
index a5bbef74ec..d7596bafa4 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockReader.java
@@ -16,13 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
 
 import org.apache.paimon.memory.MemorySlice;
 
 import java.util.Comparator;
 
-import static org.apache.paimon.lookup.sort.BlockAlignedType.ALIGNED;
+import static org.apache.paimon.sst.BlockAlignedType.ALIGNED;
 
 /** Reader for a block. */
 public class BlockReader {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockTrailer.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockTrailer.java
similarity index 98%
rename from 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockTrailer.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockTrailer.java
index 6d49bd9cc5..05d310fa12 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockTrailer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockTrailer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
 
 import org.apache.paimon.compression.BlockCompressionType;
 import org.apache.paimon.memory.MemorySlice;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockWriter.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockWriter.java
similarity index 69%
rename from 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockWriter.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockWriter.java
index 340abac862..8823f47385 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockWriter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
 
 import org.apache.paimon.memory.MemorySlice;
 import org.apache.paimon.memory.MemorySliceOutput;
@@ -24,10 +24,34 @@ import org.apache.paimon.utils.IntArrayList;
 
 import java.io.IOException;
 
-import static org.apache.paimon.lookup.sort.BlockAlignedType.ALIGNED;
-import static org.apache.paimon.lookup.sort.BlockAlignedType.UNALIGNED;
+import static org.apache.paimon.sst.BlockAlignedType.ALIGNED;
+import static org.apache.paimon.sst.BlockAlignedType.UNALIGNED;
 
-/** Writer to build a Block. */
+/**
+ * Writer to build a Block. A block is designed for storing and 
random-accessing k-v pairs. The
+ * layout is as below:
+ *
+ * <pre>
+ *     +---------------+
+ *     | Block Trailer |
+ *     +------------------------------------------------+
+ *     |       Block CRC23C      |     Compression      |
+ *     +------------------------------------------------+
+ *     +---------------+
+ *     |  Block Data   |
+ *     +---------------+--------------------------------+----+
+ *     | key len | key bytes | value len | value bytes  |    |
+ *     +------------------------------------------------+    |
+ *     | key len | key bytes | value len | value bytes  |    +-> Key-Value 
pairs
+ *     +------------------------------------------------+    |
+ *     |                  ... ...                       |    |
+ *     +------------------------------------------------+----+
+ *     | entry pos | entry pos |     ...    | entry pos |    +-> optional, for 
unaligned block
+ *     +------------------------------------------------+----+
+ *     |   entry num  /  entry size   |   aligned type  |
+ *     +------------------------------------------------+
+ * </pre>
+ */
 public class BlockWriter {
 
     private final IntArrayList positions;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BloomFilterHandle.java
 b/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
similarity index 98%
rename from 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BloomFilterHandle.java
rename to 
paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
index 7ec6a845c5..7f3804dd8d 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BloomFilterHandle.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
 
 import java.util.Objects;
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/Footer.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/Footer.java
similarity index 96%
rename from 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/Footer.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/Footer.java
index b8ae517891..7855d18a94 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/Footer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/Footer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
 
 import org.apache.paimon.memory.MemorySlice;
 import org.apache.paimon.memory.MemorySliceInput;
@@ -26,7 +26,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 
-import static org.apache.paimon.lookup.sort.SortLookupStoreWriter.MAGIC_NUMBER;
+import static org.apache.paimon.sst.SstFileWriter.MAGIC_NUMBER;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Footer for a sorted file. */
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortContext.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/SortContext.java
similarity index 96%
rename from 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortContext.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/SortContext.java
index 5aacb56bb7..797def3fdb 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortContext.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SortContext.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
 
 import org.apache.paimon.lookup.LookupStoreFactory.Context;
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
 b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
similarity index 85%
copy from 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
copy to paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
index fd068baaa2..cf10e13a05 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
@@ -16,13 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
 
 import org.apache.paimon.compression.BlockCompressionFactory;
 import org.apache.paimon.compression.BlockDecompressor;
-import org.apache.paimon.io.PageFileInput;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.io.cache.CacheManager;
-import org.apache.paimon.lookup.LookupStoreReader;
 import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.memory.MemorySlice;
 import org.apache.paimon.memory.MemorySliceInput;
@@ -31,41 +31,48 @@ import org.apache.paimon.utils.MurmurHashUtils;
 
 import javax.annotation.Nullable;
 
-import java.io.File;
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Comparator;
 
-import static org.apache.paimon.lookup.sort.SortLookupStoreUtils.crc32c;
+import static org.apache.paimon.sst.SstFileUtils.crc32c;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
-/** A {@link LookupStoreReader} for sort store. */
-public class SortLookupStoreReader implements LookupStoreReader {
+/**
+ * An SST File Reader which only serves point queries now.
+ *
+ * <p>Note that this class is NOT thread-safe.
+ */
+public class SstFileReader implements Closeable {
 
     private final Comparator<MemorySlice> comparator;
-    private final String filePath;
+    private final Path filePath;
     private final long fileSize;
 
     private final BlockIterator indexBlockIterator;
     @Nullable private FileBasedBloomFilter bloomFilter;
     private final BlockCache blockCache;
-    private final PageFileInput fileInput;
 
-    public SortLookupStoreReader(
-            Comparator<MemorySlice> comparator, File file, int blockSize, 
CacheManager cacheManager)
+    public SstFileReader(
+            Comparator<MemorySlice> comparator,
+            long fileSize,
+            Path filePath,
+            SeekableInputStream input,
+            CacheManager cacheManager)
             throws IOException {
         this.comparator = comparator;
-        this.filePath = file.getAbsolutePath();
-        this.fileSize = file.length();
+        this.filePath = filePath;
+        this.fileSize = fileSize;
 
-        this.fileInput = PageFileInput.create(file, blockSize, null, fileSize, 
null);
-        this.blockCache = new BlockCache(fileInput.file(), cacheManager);
+        this.blockCache = new BlockCache(filePath, input, cacheManager);
         Footer footer = readFooter();
         this.indexBlockIterator = readBlock(footer.getIndexBlockHandle(), 
true).iterator();
         BloomFilterHandle handle = footer.getBloomFilterHandle();
         if (handle != null) {
             this.bloomFilter =
                     new FileBasedBloomFilter(
-                            fileInput,
+                            input,
+                            filePath,
                             cacheManager,
                             handle.expectedEntries(),
                             handle.offset(),
@@ -80,8 +87,13 @@ public class SortLookupStoreReader implements 
LookupStoreReader {
         return Footer.readFooter(MemorySlice.wrap(footerData).toInput());
     }
 
+    /**
+     * Lookup the specified key in the file.
+     *
+     * @param key serialized key
+     * @return corresponding serialized value, null if not found.
+     */
     @Nullable
-    @Override
     public byte[] lookup(byte[] key) throws IOException {
         if (bloomFilter != null && 
!bloomFilter.testHash(MurmurHashUtils.hashBytes(key))) {
             return null;
@@ -171,6 +183,6 @@ public class SortLookupStoreReader implements 
LookupStoreReader {
             bloomFilter.close();
         }
         blockCache.close();
-        fileInput.close();
+        // do not need to close input, since it will be closed by outer classes
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreUtils.java
 b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileUtils.java
similarity index 95%
rename from 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreUtils.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/SstFileUtils.java
index 8339ac01f8..c1a15ab91b 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileUtils.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
 
 import org.apache.paimon.compression.BlockCompressionType;
 import org.apache.paimon.memory.MemorySegment;
@@ -25,7 +25,7 @@ import org.apache.paimon.memory.MemorySlice;
 import java.util.zip.CRC32;
 
 /** Utils for sort lookup store. */
-public class SortLookupStoreUtils {
+public class SstFileUtils {
     public static int crc32c(MemorySlice data, BlockCompressionType type) {
         CRC32 crc = new CRC32();
         crc.update(data.getHeapMemory(), data.offset(), data.length());
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
 b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
similarity index 76%
copy from 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
copy to paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
index 31f09cfbc6..8eb234b564 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
@@ -16,12 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
 
 import org.apache.paimon.compression.BlockCompressionFactory;
 import org.apache.paimon.compression.BlockCompressionType;
 import org.apache.paimon.compression.BlockCompressor;
-import org.apache.paimon.lookup.LookupStoreWriter;
+import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.memory.MemorySlice;
 import org.apache.paimon.options.MemorySize;
@@ -33,25 +33,42 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.io.BufferedOutputStream;
-import java.io.File;
+import java.io.Closeable;
 import java.io.IOException;
-import java.nio.file.Files;
 
-import static org.apache.paimon.lookup.sort.BlockHandle.writeBlockHandle;
-import static org.apache.paimon.lookup.sort.SortLookupStoreUtils.crc32c;
 import static org.apache.paimon.memory.MemorySegmentUtils.allocateReuseBytes;
+import static org.apache.paimon.sst.BlockHandle.writeBlockHandle;
+import static org.apache.paimon.sst.SstFileUtils.crc32c;
 import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
 
-/** A {@link LookupStoreWriter} for sorting. */
-public class SortLookupStoreWriter implements LookupStoreWriter {
+/**
+ * The writer for writing SST Files. SST Files are row-oriented and designed 
to serve frequent point
+ * queries and range queries by key. The SST File layout is as below: (For 
layouts of each block
+ * type, please refer to corresponding classes)
+ *
+ * <pre>
+ *     +-----------------------------------+------+
+ *     |             Footer                |      |
+ *     +-----------------------------------+      |
+ *     |           Index Block             |      +--> Loaded on open
+ *     +-----------------------------------+      |
+ *     |        Bloom Filter Block         |      |
+ *     +-----------------------------------+------+
+ *     |            Data Block             |      |
+ *     +-----------------------------------+      |
+ *     |              ......               |      +--> Loaded on requested
+ *     +-----------------------------------+      |
+ *     |            Data Block             |      |
+ *     +-----------------------------------+------+
+ * </pre>
+ */
+public class SstFileWriter implements Closeable {
 
-    private static final Logger LOG =
-            LoggerFactory.getLogger(SortLookupStoreWriter.class.getName());
+    private static final Logger LOG = 
LoggerFactory.getLogger(SstFileWriter.class.getName());
 
     public static final int MAGIC_NUMBER = 1481571681;
 
-    private final BufferedOutputStream fileOutputStream;
+    private final PositionOutputStream out;
     private final int blockSize;
     private final BlockWriter dataBlockWriter;
     private final BlockWriter indexBlockWriter;
@@ -66,13 +83,13 @@ public class SortLookupStoreWriter implements 
LookupStoreWriter {
     private long totalUncompressedSize;
     private long totalCompressedSize;
 
-    SortLookupStoreWriter(
-            File file,
+    public SstFileWriter(
+            PositionOutputStream out,
             int blockSize,
             @Nullable BloomFilter.Builder bloomFilter,
             @Nullable BlockCompressionFactory compressionFactory)
             throws IOException {
-        this.fileOutputStream = new 
BufferedOutputStream(Files.newOutputStream(file.toPath()));
+        this.out = out;
         this.blockSize = blockSize;
         this.dataBlockWriter = new BlockWriter((int) (blockSize * 1.1));
         int expectedNumberOfBlocks = 1024;
@@ -88,7 +105,14 @@ public class SortLookupStoreWriter implements 
LookupStoreWriter {
         }
     }
 
-    @Override
+    /**
+     * Put the serialized key and value into this SST File. The caller must 
guarantee that the input
+     * key is monotonically incremental according to {@link SstFileReader}'s 
comparator. Otherwise,
+     * the lookup and range query result will be undefined.
+     *
+     * @param key serialized key
+     * @param value serialized value
+     */
     public void put(byte[] key, byte[] value) throws IOException {
         dataBlockWriter.add(key, value);
         if (bloomFilter != null) {
@@ -189,15 +213,14 @@ public class SortLookupStoreWriter implements 
LookupStoreWriter {
         MemorySlice footerEncoding = Footer.writeFooter(footer);
         writeSlice(footerEncoding);
 
-        // close file
-        fileOutputStream.close();
+        // do not need to close outputStream, since it will be closed by outer 
classes
 
         LOG.info("totalUncompressedSize: {}", 
MemorySize.ofBytes(totalUncompressedSize));
         LOG.info("totalCompressedSize: {}", 
MemorySize.ofBytes(totalCompressedSize));
     }
 
     private void writeSlice(MemorySlice slice) throws IOException {
-        fileOutputStream.write(slice.getHeapMemory(), slice.offset(), 
slice.length());
+        out.write(slice.getHeapMemory(), slice.offset(), slice.length());
         position += slice.length();
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
index ede7a8e3cf..693c3d2bc7 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
@@ -19,7 +19,8 @@
 package org.apache.paimon.utils;
 
 import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.io.PageFileInput;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.io.cache.CacheCallback;
 import org.apache.paimon.io.cache.CacheKey;
 import org.apache.paimon.io.cache.CacheManager;
@@ -34,16 +35,18 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 /** Util to apply a built bloom filter . */
 public class FileBasedBloomFilter implements Closeable {
 
-    private final PageFileInput input;
+    private final SeekableInputStream input;
     private final CacheManager cacheManager;
     private final BloomFilter filter;
     private final long readOffset;
-    private final int readLength;
     private final CacheKey cacheKey;
+    // each bloom filter is only used by a single file reader, so we can 
safely reuse here
+    private final byte[] reusedPageBuffer;
     private int accessCount;
 
     public FileBasedBloomFilter(
-            PageFileInput input,
+            SeekableInputStream input,
+            Path filePath,
             CacheManager cacheManager,
             long expectedEntries,
             long readOffset,
@@ -53,9 +56,9 @@ public class FileBasedBloomFilter implements Closeable {
         checkArgument(expectedEntries >= 0);
         this.filter = new BloomFilter(expectedEntries, readLength);
         this.readOffset = readOffset;
-        this.readLength = readLength;
         this.accessCount = 0;
-        this.cacheKey = CacheKey.forPosition(input.file(), readOffset, 
readLength, true);
+        this.cacheKey = CacheKey.forPosition(filePath, readOffset, readLength, 
true);
+        this.reusedPageBuffer = new byte[readLength];
     }
 
     public boolean testHash(int hash) {
@@ -65,15 +68,19 @@ public class FileBasedBloomFilter implements Closeable {
         if (accessCount == REFRESH_COUNT || filter.getMemorySegment() == null) 
{
             MemorySegment segment =
                     cacheManager.getPage(
-                            cacheKey,
-                            key -> input.readPosition(readOffset, readLength),
-                            new BloomFilterCallBack(filter));
+                            cacheKey, key -> readPage(), new 
BloomFilterCallBack(filter));
             filter.setMemorySegment(segment, 0);
             accessCount = 0;
         }
         return filter.testHash(hash);
     }
 
+    private byte[] readPage() throws IOException {
+        input.seek(readOffset);
+        IOUtils.readFully(input, reusedPageBuffer);
+        return reusedPageBuffer;
+    }
+
     @VisibleForTesting
     BloomFilter bloomFilter() {
         return filter;
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/BlockIteratorTest.java
 b/paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
similarity index 99%
rename from 
paimon-common/src/test/java/org/apache/paimon/lookup/sort/BlockIteratorTest.java
rename to 
paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
index 71c75ff95a..004b920a4e 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/BlockIteratorTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
 
 import org.apache.paimon.memory.MemorySlice;
 import org.apache.paimon.memory.MemorySliceOutput;
diff --git a/paimon-common/src/test/java/org/apache/paimon/sst/SstFileTest.java 
b/paimon-common/src/test/java/org/apache/paimon/sst/SstFileTest.java
new file mode 100644
index 0000000000..22aa27feb3
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/sst/SstFileTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.sst;
+
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceOutput;
+import org.apache.paimon.options.MemorySize;
+import 
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
+import org.apache.paimon.testutils.junit.parameterized.Parameters;
+import org.apache.paimon.utils.BloomFilter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+/** Test for {@link SstFileReader} and {@link SstFileWriter}. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class SstFileTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SstFileTest.class);
+
+    // 256 records per block
+    private static final int BLOCK_SIZE = (10) * 256;
+    private static final CacheManager CACHE_MANAGER = new 
CacheManager(MemorySize.ofMebiBytes(10));
+    @TempDir java.nio.file.Path tempPath;
+
+    private final boolean bloomFilterEnabled;
+    private final CompressOptions compress;
+
+    private FileIO fileIO;
+    private Path file;
+    private Path parent;
+
+    public SstFileTest(List<Object> var) {
+        this.bloomFilterEnabled = (Boolean) var.get(0);
+        this.compress = new CompressOptions((String) var.get(1), 1);
+    }
+
+    @SuppressWarnings("unused")
+    @Parameters(name = "enableBf&compress-{0}")
+    public static List<List<Object>> getVarSeg() {
+        return Arrays.asList(
+                Arrays.asList(true, "none"),
+                Arrays.asList(false, "none"),
+                Arrays.asList(false, "lz4"),
+                Arrays.asList(true, "lz4"),
+                Arrays.asList(false, "zstd"),
+                Arrays.asList(true, "zstd"));
+    }
+
+    @BeforeEach
+    public void beforeEach() {
+        this.fileIO = LocalFileIO.create();
+        this.parent = new Path(tempPath.toUri());
+        this.file = new Path(new Path(tempPath.toUri()), 
UUID.randomUUID().toString());
+    }
+
+    @TestTemplate
+    public void testLookup() throws Exception {
+        writeData(5000, bloomFilterEnabled);
+        innerTestLookup();
+    }
+
+    private void innerTestLookup() throws Exception {
+        long fileSize = fileIO.getFileSize(file);
+        try (SeekableInputStream inputStream = fileIO.newInputStream(file);
+                SstFileReader reader =
+                        new SstFileReader(
+                                Comparator.comparingInt(slice -> 
slice.readInt(0)),
+                                fileSize,
+                                file,
+                                inputStream,
+                                CACHE_MANAGER); ) {
+            Random random = new Random();
+            MemorySliceOutput keyOut = new MemorySliceOutput(4);
+
+            // 1. lookup random existing keys
+            for (int i = 0; i < 100; i++) {
+                int key = random.nextInt(5000);
+                keyOut.reset();
+                keyOut.writeInt(key);
+                byte[] queried = 
reader.lookup(keyOut.toSlice().getHeapMemory());
+                Assertions.assertNotNull(queried);
+                Assertions.assertEquals(key, 
MemorySlice.wrap(queried).readInt(0));
+            }
+
+            // 2. lookup boundaries
+            keyOut.reset();
+            keyOut.writeInt(0);
+            byte[] queried = reader.lookup(keyOut.toSlice().getHeapMemory());
+            Assertions.assertNotNull(queried);
+            Assertions.assertEquals(0, MemorySlice.wrap(queried).readInt(0));
+
+            keyOut.reset();
+            keyOut.writeInt(511);
+            byte[] queried1 = reader.lookup(keyOut.toSlice().getHeapMemory());
+            Assertions.assertNotNull(queried1);
+            Assertions.assertEquals(511, 
MemorySlice.wrap(queried1).readInt(0));
+
+            keyOut.reset();
+            keyOut.writeInt(4999);
+            byte[] queried2 = reader.lookup(keyOut.toSlice().getHeapMemory());
+            Assertions.assertNotNull(queried2);
+            Assertions.assertEquals(4999, 
MemorySlice.wrap(queried2).readInt(0));
+
+            // 2. lookup key smaller than first key
+            for (int i = 0; i < 100; i++) {
+                keyOut.reset();
+                keyOut.writeInt(-10 - i);
+                
Assertions.assertNull(reader.lookup(keyOut.toSlice().getHeapMemory()));
+            }
+
+            // 3. lookup key greater than last key
+            for (int i = 0; i < 100; i++) {
+                keyOut.reset();
+                keyOut.writeInt(10000 + i);
+                
Assertions.assertNull(reader.lookup(keyOut.toSlice().getHeapMemory()));
+            }
+        }
+    }
+
+    private void writeData(int recordCount, boolean withBloomFilter) throws 
Exception {
+        BloomFilter.Builder bloomFilter = null;
+        if (withBloomFilter) {
+            bloomFilter = BloomFilter.builder(recordCount, 0.05);
+        }
+        BlockCompressionFactory compressionFactory = 
BlockCompressionFactory.create(compress);
+        try (PositionOutputStream outputStream = fileIO.newOutputStream(file, 
true);
+                SstFileWriter writer =
+                        new SstFileWriter(
+                                outputStream, BLOCK_SIZE, bloomFilter, 
compressionFactory); ) {
+            MemorySliceOutput keyOut = new MemorySliceOutput(4);
+            MemorySliceOutput valueOut = new MemorySliceOutput(4);
+            long start = System.currentTimeMillis();
+            for (int i = 0; i < recordCount; i++) {
+                keyOut.reset();
+                valueOut.reset();
+                keyOut.writeInt(i);
+                valueOut.writeInt(i);
+                writer.put(keyOut.toSlice().getHeapMemory(), 
valueOut.toSlice().getHeapMemory());
+            }
+            LOG.info("Write {} data cost {} ms", recordCount, 
System.currentTimeMillis() - start);
+        }
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
index d1471fd74a..8ff1da558c 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
@@ -18,7 +18,8 @@
 
 package org.apache.paimon.utils;
 
-import org.apache.paimon.io.PageFileInput;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.cache.Cache;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.memory.MemorySegment;
@@ -62,16 +63,14 @@ public class FileBasedBloomFilterTest {
         BloomFilter.Builder builder = new BloomFilter.Builder(segment, 100);
         int[] inputs = CommonTestUtils.generateRandomInts(100);
         Arrays.stream(inputs).forEach(i -> 
builder.addHash(Integer.hashCode(i)));
-        File file = writeFile(segment.getArray());
+        org.apache.paimon.fs.Path filePath =
+                new 
org.apache.paimon.fs.Path(writeFile(segment.getArray()).getAbsolutePath());
+        FileIO fileIO = LocalFileIO.create();
 
         CacheManager cacheManager = new CacheManager(cacheType, 
MemorySize.ofMebiBytes(1), 0.1);
         FileBasedBloomFilter filter =
                 new FileBasedBloomFilter(
-                        PageFileInput.create(file, 1024, null, 0, null),
-                        cacheManager,
-                        100,
-                        0,
-                        1000);
+                        fileIO.newInputStream(filePath), filePath, 
cacheManager, 100, 0, 1000);
 
         Arrays.stream(inputs)
                 .forEach(i -> 
Assertions.assertThat(filter.testHash(Integer.hashCode(i))).isTrue());

Reply via email to