This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 432fad719e [core] Extract an SST File Format from LookupStore. (#6755)
432fad719e is described below
commit 432fad719ec93bcc902a16e3b31f374be473d339
Author: Faiz <[email protected]>
AuthorDate: Sat Dec 6 20:04:14 2025 +0800
[core] Extract an SST File Format from LookupStore. (#6755)
---
.../java/org/apache/paimon/io/cache/CacheKey.java | 19 +--
.../paimon/lookup/sort/SortLookupStoreFactory.java | 2 +-
.../paimon/lookup/sort/SortLookupStoreReader.java | 150 +++--------------
.../paimon/lookup/sort/SortLookupStoreWriter.java | 177 +++-----------------
.../{lookup/sort => sst}/BlockAlignedType.java | 2 +-
.../paimon/{lookup/sort => sst}/BlockCache.java | 27 ++--
.../paimon/{lookup/sort => sst}/BlockEntry.java | 2 +-
.../paimon/{lookup/sort => sst}/BlockHandle.java | 2 +-
.../paimon/{lookup/sort => sst}/BlockIterator.java | 2 +-
.../paimon/{lookup/sort => sst}/BlockReader.java | 4 +-
.../paimon/{lookup/sort => sst}/BlockTrailer.java | 2 +-
.../paimon/{lookup/sort => sst}/BlockWriter.java | 32 +++-
.../{lookup/sort => sst}/BloomFilterHandle.java | 2 +-
.../apache/paimon/{lookup/sort => sst}/Footer.java | 4 +-
.../paimon/{lookup/sort => sst}/SortContext.java | 2 +-
.../SstFileReader.java} | 48 +++---
.../SstFileUtils.java} | 4 +-
.../SstFileWriter.java} | 61 ++++---
.../apache/paimon/utils/FileBasedBloomFilter.java | 25 +--
.../{lookup/sort => sst}/BlockIteratorTest.java | 2 +-
.../java/org/apache/paimon/sst/SstFileTest.java | 178 +++++++++++++++++++++
.../paimon/utils/FileBasedBloomFilterTest.java | 13 +-
22 files changed, 376 insertions(+), 384 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
index 11b8beb22c..74bca4c593 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
@@ -18,14 +18,16 @@
package org.apache.paimon.io.cache;
+import org.apache.paimon.fs.Path;
+
import java.io.RandomAccessFile;
import java.util.Objects;
/** Key for cache manager. */
public interface CacheKey {
- static CacheKey forPosition(RandomAccessFile file, long position, int
length, boolean isIndex) {
- return new PositionCacheKey(file, position, length, isIndex);
+ static CacheKey forPosition(Path filePath, long position, int length,
boolean isIndex) {
+ return new PositionCacheKey(filePath, position, length, isIndex);
}
static CacheKey forPageIndex(RandomAccessFile file, int pageSize, int
pageIndex) {
@@ -35,17 +37,16 @@ public interface CacheKey {
/** @return Whether this cache key is for index cache. */
boolean isIndex();
- /** Key for file position and length. */
+ /** Key for file position of a file path (could be remote) and length. */
class PositionCacheKey implements CacheKey {
- private final RandomAccessFile file;
+ private final Path filePath;
private final long position;
private final int length;
private final boolean isIndex;
- private PositionCacheKey(
- RandomAccessFile file, long position, int length, boolean
isIndex) {
- this.file = file;
+ private PositionCacheKey(Path filePath, long position, int length,
boolean isIndex) {
+ this.filePath = filePath;
this.position = position;
this.length = length;
this.isIndex = isIndex;
@@ -63,12 +64,12 @@ public interface CacheKey {
return position == that.position
&& length == that.length
&& isIndex == that.isIndex
- && Objects.equals(file, that.file);
+ && Objects.equals(filePath, that.filePath);
}
@Override
public int hashCode() {
- return Objects.hash(file, position, length, isIndex);
+ return Objects.hash(filePath, position, length, isIndex);
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
index 7dcacc9ad1..cbf01d590e 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
@@ -52,7 +52,7 @@ public class SortLookupStoreFactory implements
LookupStoreFactory {
@Override
public SortLookupStoreReader createReader(File file) throws IOException {
- return new SortLookupStoreReader(comparator, file, blockSize,
cacheManager);
+ return new SortLookupStoreReader(comparator, file, cacheManager);
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
index fd068baaa2..684273ae15 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
@@ -18,16 +18,14 @@
package org.apache.paimon.lookup.sort;
-import org.apache.paimon.compression.BlockCompressionFactory;
-import org.apache.paimon.compression.BlockDecompressor;
-import org.apache.paimon.io.PageFileInput;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.LookupStoreReader;
-import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySlice;
-import org.apache.paimon.memory.MemorySliceInput;
-import org.apache.paimon.utils.FileBasedBloomFilter;
-import org.apache.paimon.utils.MurmurHashUtils;
+import org.apache.paimon.sst.SstFileReader;
import javax.annotation.Nullable;
@@ -35,142 +33,34 @@ import java.io.File;
import java.io.IOException;
import java.util.Comparator;
-import static org.apache.paimon.lookup.sort.SortLookupStoreUtils.crc32c;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
-/** A {@link LookupStoreReader} for sort store. */
+/** A {@link LookupStoreReader} backed by an {@link SstFileReader}. */
public class SortLookupStoreReader implements LookupStoreReader {
- private final Comparator<MemorySlice> comparator;
- private final String filePath;
- private final long fileSize;
-
- private final BlockIterator indexBlockIterator;
- @Nullable private FileBasedBloomFilter bloomFilter;
- private final BlockCache blockCache;
- private final PageFileInput fileInput;
+ private final FileIO fileIO;
+ private final SeekableInputStream input;
+ private final SstFileReader sstFileReader;
public SortLookupStoreReader(
- Comparator<MemorySlice> comparator, File file, int blockSize,
CacheManager cacheManager)
+ Comparator<MemorySlice> comparator, File file, CacheManager
cacheManager)
throws IOException {
- this.comparator = comparator;
- this.filePath = file.getAbsolutePath();
- this.fileSize = file.length();
-
- this.fileInput = PageFileInput.create(file, blockSize, null, fileSize,
null);
- this.blockCache = new BlockCache(fileInput.file(), cacheManager);
- Footer footer = readFooter();
- this.indexBlockIterator = readBlock(footer.getIndexBlockHandle(),
true).iterator();
- BloomFilterHandle handle = footer.getBloomFilterHandle();
- if (handle != null) {
- this.bloomFilter =
- new FileBasedBloomFilter(
- fileInput,
- cacheManager,
- handle.expectedEntries(),
- handle.offset(),
- handle.size());
- }
- }
-
- private Footer readFooter() throws IOException {
- MemorySegment footerData =
- blockCache.getBlock(
- fileSize - Footer.ENCODED_LENGTH,
Footer.ENCODED_LENGTH, b -> b, true);
- return Footer.readFooter(MemorySlice.wrap(footerData).toInput());
+ final Path filePath = new Path(file.getAbsolutePath());
+ this.fileIO = LocalFileIO.create();
+ this.input = fileIO.newInputStream(filePath);
+ this.sstFileReader =
+ new SstFileReader(comparator, file.length(), filePath, input,
cacheManager);
}
@Nullable
@Override
public byte[] lookup(byte[] key) throws IOException {
- if (bloomFilter != null &&
!bloomFilter.testHash(MurmurHashUtils.hashBytes(key))) {
- return null;
- }
-
- MemorySlice keySlice = MemorySlice.wrap(key);
- // seek the index to the block containing the key
- indexBlockIterator.seekTo(keySlice);
-
- // if indexIterator does not have a next, it means the key does not
exist in this iterator
- if (indexBlockIterator.hasNext()) {
- // seek the current iterator to the key
- BlockIterator current = getNextBlock();
- if (current.seekTo(keySlice)) {
- return current.next().getValue().copyBytes();
- }
- }
- return null;
- }
-
- private BlockIterator getNextBlock() {
- // index block handle, point to the key, value position.
- MemorySlice blockHandle = indexBlockIterator.next().getValue();
- BlockReader dataBlock =
- readBlock(BlockHandle.readBlockHandle(blockHandle.toInput()),
false);
- return dataBlock.iterator();
- }
-
- /**
- * @param blockHandle The block handle.
- * @param index Whether read the block as an index.
- * @return The reader of the target block.
- */
- private BlockReader readBlock(BlockHandle blockHandle, boolean index) {
- // read block trailer
- MemorySegment trailerData =
- blockCache.getBlock(
- blockHandle.offset() + blockHandle.size(),
- BlockTrailer.ENCODED_LENGTH,
- b -> b,
- true);
- BlockTrailer blockTrailer =
-
BlockTrailer.readBlockTrailer(MemorySlice.wrap(trailerData).toInput());
-
- MemorySegment unCompressedBlock =
- blockCache.getBlock(
- blockHandle.offset(),
- blockHandle.size(),
- bytes -> decompressBlock(bytes, blockTrailer),
- index);
- return new BlockReader(MemorySlice.wrap(unCompressedBlock),
comparator);
- }
-
- private byte[] decompressBlock(byte[] compressedBytes, BlockTrailer
blockTrailer) {
- MemorySegment compressed = MemorySegment.wrap(compressedBytes);
- int crc32cCode = crc32c(compressed, blockTrailer.getCompressionType());
- checkArgument(
- blockTrailer.getCrc32c() == crc32cCode,
- String.format(
- "Expected CRC32C(%d) but found CRC32C(%d) for
file(%s)",
- blockTrailer.getCrc32c(), crc32cCode, filePath));
-
- // decompress data
- BlockCompressionFactory compressionFactory =
-
BlockCompressionFactory.create(blockTrailer.getCompressionType());
- if (compressionFactory == null) {
- return compressedBytes;
- } else {
- MemorySliceInput compressedInput =
MemorySlice.wrap(compressed).toInput();
- byte[] uncompressed = new byte[compressedInput.readVarLenInt()];
- BlockDecompressor decompressor =
compressionFactory.getDecompressor();
- int uncompressedLength =
- decompressor.decompress(
- compressed.getHeapMemory(),
- compressedInput.position(),
- compressedInput.available(),
- uncompressed,
- 0);
- checkArgument(uncompressedLength == uncompressed.length);
- return uncompressed;
- }
+ return sstFileReader.lookup(key);
}
@Override
public void close() throws IOException {
- if (bloomFilter != null) {
- bloomFilter.close();
- }
- blockCache.close();
- fileInput.close();
+ // be careful about the close order
+ sstFileReader.close();
+ input.close();
+ fileIO.close();
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
index 31f09cfbc6..366e4560ac 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
@@ -19,185 +19,46 @@
package org.apache.paimon.lookup.sort;
import org.apache.paimon.compression.BlockCompressionFactory;
-import org.apache.paimon.compression.BlockCompressionType;
-import org.apache.paimon.compression.BlockCompressor;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.lookup.LookupStoreWriter;
-import org.apache.paimon.memory.MemorySegment;
-import org.apache.paimon.memory.MemorySlice;
-import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.sst.SstFileWriter;
import org.apache.paimon.utils.BloomFilter;
-import org.apache.paimon.utils.MurmurHashUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
-
-import static org.apache.paimon.lookup.sort.BlockHandle.writeBlockHandle;
-import static org.apache.paimon.lookup.sort.SortLookupStoreUtils.crc32c;
-import static org.apache.paimon.memory.MemorySegmentUtils.allocateReuseBytes;
-import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
-/** A {@link LookupStoreWriter} for sorting. */
+/** A {@link LookupStoreWriter} backed by an {@link SstFileWriter}. */
public class SortLookupStoreWriter implements LookupStoreWriter {
+ private final SstFileWriter sstFileWriter;
+ private final FileIO fileIO;
+ private final PositionOutputStream out;
- private static final Logger LOG =
- LoggerFactory.getLogger(SortLookupStoreWriter.class.getName());
-
- public static final int MAGIC_NUMBER = 1481571681;
-
- private final BufferedOutputStream fileOutputStream;
- private final int blockSize;
- private final BlockWriter dataBlockWriter;
- private final BlockWriter indexBlockWriter;
- @Nullable private final BloomFilter.Builder bloomFilter;
- private final BlockCompressionType compressionType;
- @Nullable private final BlockCompressor blockCompressor;
-
- private byte[] lastKey;
- private long position;
-
- private long recordCount;
- private long totalUncompressedSize;
- private long totalCompressedSize;
-
- SortLookupStoreWriter(
+ public SortLookupStoreWriter(
File file,
int blockSize,
@Nullable BloomFilter.Builder bloomFilter,
- @Nullable BlockCompressionFactory compressionFactory)
+ BlockCompressionFactory compressionFactory)
throws IOException {
- this.fileOutputStream = new
BufferedOutputStream(Files.newOutputStream(file.toPath()));
- this.blockSize = blockSize;
- this.dataBlockWriter = new BlockWriter((int) (blockSize * 1.1));
- int expectedNumberOfBlocks = 1024;
- this.indexBlockWriter =
- new BlockWriter(BlockHandle.MAX_ENCODED_LENGTH *
expectedNumberOfBlocks);
- this.bloomFilter = bloomFilter;
- if (compressionFactory == null) {
- this.compressionType = BlockCompressionType.NONE;
- this.blockCompressor = null;
- } else {
- this.compressionType = compressionFactory.getCompressionType();
- this.blockCompressor = compressionFactory.getCompressor();
- }
+ final Path filePath = new Path(file.getAbsolutePath());
+ this.fileIO = LocalFileIO.create();
+ this.out = fileIO.newOutputStream(filePath, true);
+ this.sstFileWriter = new SstFileWriter(out, blockSize, bloomFilter,
compressionFactory);
}
@Override
public void put(byte[] key, byte[] value) throws IOException {
- dataBlockWriter.add(key, value);
- if (bloomFilter != null) {
- bloomFilter.addHash(MurmurHashUtils.hashBytes(key));
- }
-
- lastKey = key;
-
- if (dataBlockWriter.memory() > blockSize) {
- flush();
- }
-
- recordCount++;
- }
-
- private void flush() throws IOException {
- if (dataBlockWriter.size() == 0) {
- return;
- }
-
- BlockHandle blockHandle = writeBlock(dataBlockWriter);
- MemorySlice handleEncoding = writeBlockHandle(blockHandle);
- indexBlockWriter.add(lastKey, handleEncoding.copyBytes());
- }
-
- private BlockHandle writeBlock(BlockWriter blockWriter) throws IOException
{
- // close the block
- MemorySlice block = blockWriter.finish();
-
- totalUncompressedSize += block.length();
-
- // attempt to compress the block
- BlockCompressionType blockCompressionType = BlockCompressionType.NONE;
- if (blockCompressor != null) {
- int maxCompressedSize =
blockCompressor.getMaxCompressedSize(block.length());
- byte[] compressed = allocateReuseBytes(maxCompressedSize + 5);
- int offset = encodeInt(compressed, 0, block.length());
- int compressedSize =
- offset
- + blockCompressor.compress(
- block.getHeapMemory(),
- block.offset(),
- block.length(),
- compressed,
- offset);
-
- // Don't use the compressed data if compressed less than 12.5%,
- if (compressedSize < block.length() - (block.length() / 8)) {
- block = new MemorySlice(MemorySegment.wrap(compressed), 0,
compressedSize);
- blockCompressionType = this.compressionType;
- }
- }
-
- totalCompressedSize += block.length();
-
- // create block trailer
- BlockTrailer blockTrailer =
- new BlockTrailer(blockCompressionType, crc32c(block,
blockCompressionType));
- MemorySlice trailer = BlockTrailer.writeBlockTrailer(blockTrailer);
-
- // create a handle to this block
- BlockHandle blockHandle = new BlockHandle(position, block.length());
-
- // write data
- writeSlice(block);
-
- // write trailer: 5 bytes
- writeSlice(trailer);
-
- // clean up state
- blockWriter.reset();
-
- return blockHandle;
+ sstFileWriter.put(key, value);
}
@Override
public void close() throws IOException {
- // flush current data block
- flush();
-
- LOG.info("Number of record: {}", recordCount);
-
- // write bloom filter
- @Nullable BloomFilterHandle bloomFilterHandle = null;
- if (bloomFilter != null) {
- MemorySegment buffer = bloomFilter.getBuffer();
- bloomFilterHandle =
- new BloomFilterHandle(position, buffer.size(),
bloomFilter.expectedEntries());
- writeSlice(MemorySlice.wrap(buffer));
- LOG.info("Bloom filter size: {} bytes",
bloomFilter.getBuffer().size());
- }
-
- // write index block
- BlockHandle indexBlockHandle = writeBlock(indexBlockWriter);
-
- // write footer
- Footer footer = new Footer(bloomFilterHandle, indexBlockHandle);
- MemorySlice footerEncoding = Footer.writeFooter(footer);
- writeSlice(footerEncoding);
-
- // close file
- fileOutputStream.close();
-
- LOG.info("totalUncompressedSize: {}",
MemorySize.ofBytes(totalUncompressedSize));
- LOG.info("totalCompressedSize: {}",
MemorySize.ofBytes(totalCompressedSize));
- }
-
- private void writeSlice(MemorySlice slice) throws IOException {
- fileOutputStream.write(slice.getHeapMemory(), slice.offset(),
slice.length());
- position += slice.length();
+ sstFileWriter.close();
+ out.close();
+ fileIO.close();
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockAlignedType.java
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockAlignedType.java
similarity index 97%
rename from
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockAlignedType.java
rename to
paimon-common/src/main/java/org/apache/paimon/sst/BlockAlignedType.java
index e5849d9f75..43387c9d63 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockAlignedType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockAlignedType.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
/** Aligned type for block. */
public enum BlockAlignedType {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java
similarity index 80%
rename from
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java
index 0441a24f22..fbeeba2e77 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java
@@ -16,18 +16,18 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.io.cache.CacheKey;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.io.cache.CacheManager.SegmentContainer;
import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.utils.IOUtils;
import java.io.Closeable;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -37,32 +37,29 @@ import java.util.function.Function;
/** Cache for block reading. */
public class BlockCache implements Closeable {
- private final RandomAccessFile file;
- private final FileChannel channel;
+ private final Path filePath;
+ private final SeekableInputStream input;
private final CacheManager cacheManager;
private final Map<CacheKey, SegmentContainer> blocks;
- public BlockCache(RandomAccessFile file, CacheManager cacheManager) {
- this.file = file;
- this.channel = this.file.getChannel();
+ public BlockCache(Path filePath, SeekableInputStream input, CacheManager
cacheManager) {
+ this.filePath = filePath;
+ this.input = input;
this.cacheManager = cacheManager;
this.blocks = new HashMap<>();
}
private byte[] readFrom(long offset, int length) throws IOException {
byte[] buffer = new byte[length];
- int read = channel.read(ByteBuffer.wrap(buffer), offset);
-
- if (read != length) {
- throw new IOException("Could not read all the data");
- }
+ input.seek(offset);
+ IOUtils.readFully(input, buffer);
return buffer;
}
public MemorySegment getBlock(
long position, int length, Function<byte[], byte[]>
decompressFunc, boolean isIndex) {
- CacheKey cacheKey = CacheKey.forPosition(file, position, length,
isIndex);
+ CacheKey cacheKey = CacheKey.forPosition(filePath, position, length,
isIndex);
SegmentContainer container = blocks.get(cacheKey);
if (container == null || container.getAccessCount() ==
CacheManager.REFRESH_COUNT) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockEntry.java
similarity index 98%
rename from
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockEntry.java
index 4473886e31..efb82e9fb6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockEntry.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
import org.apache.paimon.memory.MemorySlice;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockHandle.java
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java
similarity index 98%
rename from
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockHandle.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java
index 737f57a8b0..60d4c5929c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockHandle.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.memory.MemorySliceInput;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockIterator.java
similarity index 98%
rename from
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockIterator.java
index 0ca04e91d7..65913957e3 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockIterator.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.memory.MemorySliceInput;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockReader.java
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockReader.java
similarity index 96%
rename from
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockReader.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockReader.java
index a5bbef74ec..d7596bafa4 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockReader.java
@@ -16,13 +16,13 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
import org.apache.paimon.memory.MemorySlice;
import java.util.Comparator;
-import static org.apache.paimon.lookup.sort.BlockAlignedType.ALIGNED;
+import static org.apache.paimon.sst.BlockAlignedType.ALIGNED;
/** Reader for a block. */
public class BlockReader {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockTrailer.java
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockTrailer.java
similarity index 98%
rename from
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockTrailer.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockTrailer.java
index 6d49bd9cc5..05d310fa12 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockTrailer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockTrailer.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
import org.apache.paimon.compression.BlockCompressionType;
import org.apache.paimon.memory.MemorySlice;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockWriter.java
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockWriter.java
similarity index 69%
rename from
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockWriter.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockWriter.java
index 340abac862..8823f47385 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockWriter.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.memory.MemorySliceOutput;
@@ -24,10 +24,34 @@ import org.apache.paimon.utils.IntArrayList;
import java.io.IOException;
-import static org.apache.paimon.lookup.sort.BlockAlignedType.ALIGNED;
-import static org.apache.paimon.lookup.sort.BlockAlignedType.UNALIGNED;
+import static org.apache.paimon.sst.BlockAlignedType.ALIGNED;
+import static org.apache.paimon.sst.BlockAlignedType.UNALIGNED;
-/** Writer to build a Block. */
+/**
+ * Writer to build a Block. A block is designed for storing and
random-accessing k-v pairs. The
+ * layout is as below:
+ *
+ * <pre>
+ * +---------------+
+ * | Block Trailer |
+ * +------------------------------------------------+
+ * | Block CRC23C | Compression |
+ * +------------------------------------------------+
+ * +---------------+
+ * | Block Data |
+ * +---------------+--------------------------------+----+
+ * | key len | key bytes | value len | value bytes | |
+ * +------------------------------------------------+ |
+ * | key len | key bytes | value len | value bytes | +-> Key-Value
pairs
+ * +------------------------------------------------+ |
+ * | ... ... | |
+ * +------------------------------------------------+----+
+ * | entry pos | entry pos | ... | entry pos | +-> optional, for
unaligned block
+ * +------------------------------------------------+----+
+ * | entry num / entry size | aligned type |
+ * +------------------------------------------------+
+ * </pre>
+ */
public class BlockWriter {
private final IntArrayList positions;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BloomFilterHandle.java
b/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
similarity index 98%
rename from
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BloomFilterHandle.java
rename to
paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
index 7ec6a845c5..7f3804dd8d 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BloomFilterHandle.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
import java.util.Objects;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/Footer.java
b/paimon-common/src/main/java/org/apache/paimon/sst/Footer.java
similarity index 96%
rename from
paimon-common/src/main/java/org/apache/paimon/lookup/sort/Footer.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/Footer.java
index b8ae517891..7855d18a94 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/Footer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/Footer.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.memory.MemorySliceInput;
@@ -26,7 +26,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
-import static org.apache.paimon.lookup.sort.SortLookupStoreWriter.MAGIC_NUMBER;
+import static org.apache.paimon.sst.SstFileWriter.MAGIC_NUMBER;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Footer for a sorted file. */
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortContext.java
b/paimon-common/src/main/java/org/apache/paimon/sst/SortContext.java
similarity index 96%
rename from
paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortContext.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/SortContext.java
index 5aacb56bb7..797def3fdb 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortContext.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SortContext.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
import org.apache.paimon.lookup.LookupStoreFactory.Context;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
similarity index 85%
copy from
paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
copy to paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
index fd068baaa2..cf10e13a05 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
@@ -16,13 +16,13 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.BlockDecompressor;
-import org.apache.paimon.io.PageFileInput;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.io.cache.CacheManager;
-import org.apache.paimon.lookup.LookupStoreReader;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.memory.MemorySliceInput;
@@ -31,41 +31,48 @@ import org.apache.paimon.utils.MurmurHashUtils;
import javax.annotation.Nullable;
-import java.io.File;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Comparator;
-import static org.apache.paimon.lookup.sort.SortLookupStoreUtils.crc32c;
+import static org.apache.paimon.sst.SstFileUtils.crc32c;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-/** A {@link LookupStoreReader} for sort store. */
-public class SortLookupStoreReader implements LookupStoreReader {
+/**
+ * An SST File Reader which only serves point queries now.
+ *
+ * <p>Note that this class is NOT thread-safe.
+ */
+public class SstFileReader implements Closeable {
private final Comparator<MemorySlice> comparator;
- private final String filePath;
+ private final Path filePath;
private final long fileSize;
private final BlockIterator indexBlockIterator;
@Nullable private FileBasedBloomFilter bloomFilter;
private final BlockCache blockCache;
- private final PageFileInput fileInput;
- public SortLookupStoreReader(
- Comparator<MemorySlice> comparator, File file, int blockSize,
CacheManager cacheManager)
+ public SstFileReader(
+ Comparator<MemorySlice> comparator,
+ long fileSize,
+ Path filePath,
+ SeekableInputStream input,
+ CacheManager cacheManager)
throws IOException {
this.comparator = comparator;
- this.filePath = file.getAbsolutePath();
- this.fileSize = file.length();
+ this.filePath = filePath;
+ this.fileSize = fileSize;
- this.fileInput = PageFileInput.create(file, blockSize, null, fileSize,
null);
- this.blockCache = new BlockCache(fileInput.file(), cacheManager);
+ this.blockCache = new BlockCache(filePath, input, cacheManager);
Footer footer = readFooter();
this.indexBlockIterator = readBlock(footer.getIndexBlockHandle(),
true).iterator();
BloomFilterHandle handle = footer.getBloomFilterHandle();
if (handle != null) {
this.bloomFilter =
new FileBasedBloomFilter(
- fileInput,
+ input,
+ filePath,
cacheManager,
handle.expectedEntries(),
handle.offset(),
@@ -80,8 +87,13 @@ public class SortLookupStoreReader implements
LookupStoreReader {
return Footer.readFooter(MemorySlice.wrap(footerData).toInput());
}
+ /**
+ * Lookup the specified key in the file.
+ *
+ * @param key serialized key
+ * @return corresponding serialized value, null if not found.
+ */
@Nullable
- @Override
public byte[] lookup(byte[] key) throws IOException {
if (bloomFilter != null &&
!bloomFilter.testHash(MurmurHashUtils.hashBytes(key))) {
return null;
@@ -171,6 +183,6 @@ public class SortLookupStoreReader implements
LookupStoreReader {
bloomFilter.close();
}
blockCache.close();
- fileInput.close();
+ // do not need to close input, since it will be closed by outer classes
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreUtils.java
b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileUtils.java
similarity index 95%
rename from
paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreUtils.java
rename to paimon-common/src/main/java/org/apache/paimon/sst/SstFileUtils.java
index 8339ac01f8..c1a15ab91b 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileUtils.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
import org.apache.paimon.compression.BlockCompressionType;
import org.apache.paimon.memory.MemorySegment;
@@ -25,7 +25,7 @@ import org.apache.paimon.memory.MemorySlice;
import java.util.zip.CRC32;
/** Utils for sort lookup store. */
-public class SortLookupStoreUtils {
+public class SstFileUtils {
public static int crc32c(MemorySlice data, BlockCompressionType type) {
CRC32 crc = new CRC32();
crc.update(data.getHeapMemory(), data.offset(), data.length());
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
similarity index 76%
copy from
paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
copy to paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
index 31f09cfbc6..8eb234b564 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
@@ -16,12 +16,12 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.BlockCompressionType;
import org.apache.paimon.compression.BlockCompressor;
-import org.apache.paimon.lookup.LookupStoreWriter;
+import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.options.MemorySize;
@@ -33,25 +33,42 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.io.BufferedOutputStream;
-import java.io.File;
+import java.io.Closeable;
import java.io.IOException;
-import java.nio.file.Files;
-import static org.apache.paimon.lookup.sort.BlockHandle.writeBlockHandle;
-import static org.apache.paimon.lookup.sort.SortLookupStoreUtils.crc32c;
import static org.apache.paimon.memory.MemorySegmentUtils.allocateReuseBytes;
+import static org.apache.paimon.sst.BlockHandle.writeBlockHandle;
+import static org.apache.paimon.sst.SstFileUtils.crc32c;
import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
-/** A {@link LookupStoreWriter} for sorting. */
-public class SortLookupStoreWriter implements LookupStoreWriter {
+/**
+ * The writer for writing SST Files. SST Files are row-oriented and designed
to serve frequent point
+ * queries and range queries by key. The SST File layout is as below: (For
layouts of each block
+ * type, please refer to corresponding classes)
+ *
+ * <pre>
+ * +-----------------------------------+------+
+ * | Footer | |
+ * +-----------------------------------+ |
+ * | Index Block | +--> Loaded on open
+ * +-----------------------------------+ |
+ * | Bloom Filter Block | |
+ * +-----------------------------------+------+
+ * | Data Block | |
+ * +-----------------------------------+ |
+ * | ...... | +--> Loaded on requested
+ * +-----------------------------------+ |
+ * | Data Block | |
+ * +-----------------------------------+------+
+ * </pre>
+ */
+public class SstFileWriter implements Closeable {
- private static final Logger LOG =
- LoggerFactory.getLogger(SortLookupStoreWriter.class.getName());
+ private static final Logger LOG =
LoggerFactory.getLogger(SstFileWriter.class.getName());
public static final int MAGIC_NUMBER = 1481571681;
- private final BufferedOutputStream fileOutputStream;
+ private final PositionOutputStream out;
private final int blockSize;
private final BlockWriter dataBlockWriter;
private final BlockWriter indexBlockWriter;
@@ -66,13 +83,13 @@ public class SortLookupStoreWriter implements
LookupStoreWriter {
private long totalUncompressedSize;
private long totalCompressedSize;
- SortLookupStoreWriter(
- File file,
+ public SstFileWriter(
+ PositionOutputStream out,
int blockSize,
@Nullable BloomFilter.Builder bloomFilter,
@Nullable BlockCompressionFactory compressionFactory)
throws IOException {
- this.fileOutputStream = new
BufferedOutputStream(Files.newOutputStream(file.toPath()));
+ this.out = out;
this.blockSize = blockSize;
this.dataBlockWriter = new BlockWriter((int) (blockSize * 1.1));
int expectedNumberOfBlocks = 1024;
@@ -88,7 +105,14 @@ public class SortLookupStoreWriter implements
LookupStoreWriter {
}
}
- @Override
+ /**
+ * Put the serialized key and value into this SST File. The caller must
guarantee that the input
+ * key is monotonically incremental according to {@link SstFileReader}'s
comparator. Otherwise,
+ * the lookup and range query result will be undefined.
+ *
+ * @param key serialized key
+ * @param value serialized value
+ */
public void put(byte[] key, byte[] value) throws IOException {
dataBlockWriter.add(key, value);
if (bloomFilter != null) {
@@ -189,15 +213,14 @@ public class SortLookupStoreWriter implements
LookupStoreWriter {
MemorySlice footerEncoding = Footer.writeFooter(footer);
writeSlice(footerEncoding);
- // close file
- fileOutputStream.close();
+ // do not need to close outputStream, since it will be closed by outer
classes
LOG.info("totalUncompressedSize: {}",
MemorySize.ofBytes(totalUncompressedSize));
LOG.info("totalCompressedSize: {}",
MemorySize.ofBytes(totalCompressedSize));
}
private void writeSlice(MemorySlice slice) throws IOException {
- fileOutputStream.write(slice.getHeapMemory(), slice.offset(),
slice.length());
+ out.write(slice.getHeapMemory(), slice.offset(), slice.length());
position += slice.length();
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
index ede7a8e3cf..693c3d2bc7 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
@@ -19,7 +19,8 @@
package org.apache.paimon.utils;
import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.io.PageFileInput;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.io.cache.CacheCallback;
import org.apache.paimon.io.cache.CacheKey;
import org.apache.paimon.io.cache.CacheManager;
@@ -34,16 +35,18 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Util to apply a built bloom filter . */
public class FileBasedBloomFilter implements Closeable {
- private final PageFileInput input;
+ private final SeekableInputStream input;
private final CacheManager cacheManager;
private final BloomFilter filter;
private final long readOffset;
- private final int readLength;
private final CacheKey cacheKey;
+ // each bloom filter is only used by a single file reader, so we can
safely reuse here
+ private final byte[] reusedPageBuffer;
private int accessCount;
public FileBasedBloomFilter(
- PageFileInput input,
+ SeekableInputStream input,
+ Path filePath,
CacheManager cacheManager,
long expectedEntries,
long readOffset,
@@ -53,9 +56,9 @@ public class FileBasedBloomFilter implements Closeable {
checkArgument(expectedEntries >= 0);
this.filter = new BloomFilter(expectedEntries, readLength);
this.readOffset = readOffset;
- this.readLength = readLength;
this.accessCount = 0;
- this.cacheKey = CacheKey.forPosition(input.file(), readOffset,
readLength, true);
+ this.cacheKey = CacheKey.forPosition(filePath, readOffset, readLength,
true);
+ this.reusedPageBuffer = new byte[readLength];
}
public boolean testHash(int hash) {
@@ -65,15 +68,19 @@ public class FileBasedBloomFilter implements Closeable {
if (accessCount == REFRESH_COUNT || filter.getMemorySegment() == null)
{
MemorySegment segment =
cacheManager.getPage(
- cacheKey,
- key -> input.readPosition(readOffset, readLength),
- new BloomFilterCallBack(filter));
+ cacheKey, key -> readPage(), new
BloomFilterCallBack(filter));
filter.setMemorySegment(segment, 0);
accessCount = 0;
}
return filter.testHash(hash);
}
+ private byte[] readPage() throws IOException {
+ input.seek(readOffset);
+ IOUtils.readFully(input, reusedPageBuffer);
+ return reusedPageBuffer;
+ }
+
@VisibleForTesting
BloomFilter bloomFilter() {
return filter;
diff --git
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/BlockIteratorTest.java
b/paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
similarity index 99%
rename from
paimon-common/src/test/java/org/apache/paimon/lookup/sort/BlockIteratorTest.java
rename to
paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
index 71c75ff95a..004b920a4e 100644
---
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/BlockIteratorTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.sst;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.memory.MemorySliceOutput;
diff --git a/paimon-common/src/test/java/org/apache/paimon/sst/SstFileTest.java
b/paimon-common/src/test/java/org/apache/paimon/sst/SstFileTest.java
new file mode 100644
index 0000000000..22aa27feb3
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/sst/SstFileTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.sst;
+
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceOutput;
+import org.apache.paimon.options.MemorySize;
+import
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
+import org.apache.paimon.testutils.junit.parameterized.Parameters;
+import org.apache.paimon.utils.BloomFilter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+/** Test for {@link SstFileReader} and {@link SstFileWriter}. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class SstFileTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(SstFileTest.class);
+
+ // 256 records per block
+ private static final int BLOCK_SIZE = (10) * 256;
+ private static final CacheManager CACHE_MANAGER = new
CacheManager(MemorySize.ofMebiBytes(10));
+ @TempDir java.nio.file.Path tempPath;
+
+ private final boolean bloomFilterEnabled;
+ private final CompressOptions compress;
+
+ private FileIO fileIO;
+ private Path file;
+ private Path parent;
+
+ public SstFileTest(List<Object> var) {
+ this.bloomFilterEnabled = (Boolean) var.get(0);
+ this.compress = new CompressOptions((String) var.get(1), 1);
+ }
+
+ @SuppressWarnings("unused")
+ @Parameters(name = "enableBf&compress-{0}")
+ public static List<List<Object>> getVarSeg() {
+ return Arrays.asList(
+ Arrays.asList(true, "none"),
+ Arrays.asList(false, "none"),
+ Arrays.asList(false, "lz4"),
+ Arrays.asList(true, "lz4"),
+ Arrays.asList(false, "zstd"),
+ Arrays.asList(true, "zstd"));
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ this.fileIO = LocalFileIO.create();
+ this.parent = new Path(tempPath.toUri());
+ this.file = new Path(new Path(tempPath.toUri()),
UUID.randomUUID().toString());
+ }
+
+ @TestTemplate
+ public void testLookup() throws Exception {
+ writeData(5000, bloomFilterEnabled);
+ innerTestLookup();
+ }
+
+ private void innerTestLookup() throws Exception {
+ long fileSize = fileIO.getFileSize(file);
+ try (SeekableInputStream inputStream = fileIO.newInputStream(file);
+ SstFileReader reader =
+ new SstFileReader(
+ Comparator.comparingInt(slice ->
slice.readInt(0)),
+ fileSize,
+ file,
+ inputStream,
+ CACHE_MANAGER); ) {
+ Random random = new Random();
+ MemorySliceOutput keyOut = new MemorySliceOutput(4);
+
+ // 1. lookup random existing keys
+ for (int i = 0; i < 100; i++) {
+ int key = random.nextInt(5000);
+ keyOut.reset();
+ keyOut.writeInt(key);
+ byte[] queried =
reader.lookup(keyOut.toSlice().getHeapMemory());
+ Assertions.assertNotNull(queried);
+ Assertions.assertEquals(key,
MemorySlice.wrap(queried).readInt(0));
+ }
+
+ // 2. lookup boundaries
+ keyOut.reset();
+ keyOut.writeInt(0);
+ byte[] queried = reader.lookup(keyOut.toSlice().getHeapMemory());
+ Assertions.assertNotNull(queried);
+ Assertions.assertEquals(0, MemorySlice.wrap(queried).readInt(0));
+
+ keyOut.reset();
+ keyOut.writeInt(511);
+ byte[] queried1 = reader.lookup(keyOut.toSlice().getHeapMemory());
+ Assertions.assertNotNull(queried1);
+ Assertions.assertEquals(511,
MemorySlice.wrap(queried1).readInt(0));
+
+ keyOut.reset();
+ keyOut.writeInt(4999);
+ byte[] queried2 = reader.lookup(keyOut.toSlice().getHeapMemory());
+ Assertions.assertNotNull(queried2);
+ Assertions.assertEquals(4999,
MemorySlice.wrap(queried2).readInt(0));
+
+ // 2. lookup key smaller than first key
+ for (int i = 0; i < 100; i++) {
+ keyOut.reset();
+ keyOut.writeInt(-10 - i);
+
Assertions.assertNull(reader.lookup(keyOut.toSlice().getHeapMemory()));
+ }
+
+ // 3. lookup key greater than last key
+ for (int i = 0; i < 100; i++) {
+ keyOut.reset();
+ keyOut.writeInt(10000 + i);
+
Assertions.assertNull(reader.lookup(keyOut.toSlice().getHeapMemory()));
+ }
+ }
+ }
+
+ private void writeData(int recordCount, boolean withBloomFilter) throws
Exception {
+ BloomFilter.Builder bloomFilter = null;
+ if (withBloomFilter) {
+ bloomFilter = BloomFilter.builder(recordCount, 0.05);
+ }
+ BlockCompressionFactory compressionFactory =
BlockCompressionFactory.create(compress);
+ try (PositionOutputStream outputStream = fileIO.newOutputStream(file,
true);
+ SstFileWriter writer =
+ new SstFileWriter(
+ outputStream, BLOCK_SIZE, bloomFilter,
compressionFactory); ) {
+ MemorySliceOutput keyOut = new MemorySliceOutput(4);
+ MemorySliceOutput valueOut = new MemorySliceOutput(4);
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < recordCount; i++) {
+ keyOut.reset();
+ valueOut.reset();
+ keyOut.writeInt(i);
+ valueOut.writeInt(i);
+ writer.put(keyOut.toSlice().getHeapMemory(),
valueOut.toSlice().getHeapMemory());
+ }
+ LOG.info("Write {} data cost {} ms", recordCount,
System.currentTimeMillis() - start);
+ }
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
index d1471fd74a..8ff1da558c 100644
---
a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
@@ -18,7 +18,8 @@
package org.apache.paimon.utils;
-import org.apache.paimon.io.PageFileInput;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.cache.Cache;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.memory.MemorySegment;
@@ -62,16 +63,14 @@ public class FileBasedBloomFilterTest {
BloomFilter.Builder builder = new BloomFilter.Builder(segment, 100);
int[] inputs = CommonTestUtils.generateRandomInts(100);
Arrays.stream(inputs).forEach(i ->
builder.addHash(Integer.hashCode(i)));
- File file = writeFile(segment.getArray());
+ org.apache.paimon.fs.Path filePath =
+ new
org.apache.paimon.fs.Path(writeFile(segment.getArray()).getAbsolutePath());
+ FileIO fileIO = LocalFileIO.create();
CacheManager cacheManager = new CacheManager(cacheType,
MemorySize.ofMebiBytes(1), 0.1);
FileBasedBloomFilter filter =
new FileBasedBloomFilter(
- PageFileInput.create(file, 1024, null, 0, null),
- cacheManager,
- 100,
- 0,
- 1000);
+ fileIO.newInputStream(filePath), filePath,
cacheManager, 100, 0, 1000);
Arrays.stream(inputs)
.forEach(i ->
Assertions.assertThat(filter.testHash(Integer.hashCode(i))).isTrue());