This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e34762dfad [core] Refactor footer of lookup sst file to
SortLookupStoreFooter
e34762dfad is described below
commit e34762dfad5a80f5054c75205962963def89768c
Author: JingsongLi <[email protected]>
AuthorDate: Thu Dec 25 21:59:17 2025 +0800
[core] Refactor footer of lookup sst file to SortLookupStoreFooter
---
.../paimon/lookup/sort/SortLookupStoreFactory.java | 12 ++++-
.../sort/SortLookupStoreFooter.java} | 19 +++----
.../paimon/lookup/sort/SortLookupStoreReader.java | 32 ++++++++---
.../paimon/lookup/sort/SortLookupStoreWriter.java | 43 +++++++++++----
.../java/org/apache/paimon/sst/BlockHandle.java | 3 +-
.../org/apache/paimon/sst/BloomFilterHandle.java | 2 +-
.../java/org/apache/paimon/sst/SstFileReader.java | 41 ++++----------
.../java/org/apache/paimon/sst/SstFileWriter.java | 62 ++++++++++++----------
.../apache/paimon/utils/FileBasedBloomFilter.java | 21 ++++++++
.../sort/SortLookupStoreTest.java} | 55 +++++++++----------
.../org/apache/paimon/sst/BlockIteratorTest.java | 1 +
11 files changed, 173 insertions(+), 118 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
index cbf01d590e..546a68e18f 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
@@ -20,6 +20,10 @@ package org.apache.paimon.lookup.sort;
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.memory.MemorySlice;
@@ -52,12 +56,16 @@ public class SortLookupStoreFactory implements
LookupStoreFactory {
@Override
public SortLookupStoreReader createReader(File file) throws IOException {
- return new SortLookupStoreReader(comparator, file, cacheManager);
+ Path filePath = new Path(file.getAbsolutePath());
+ SeekableInputStream input =
LocalFileIO.INSTANCE.newInputStream(filePath);
+ return new SortLookupStoreReader(comparator, filePath, file.length(),
input, cacheManager);
}
@Override
public SortLookupStoreWriter createWriter(File file, @Nullable
BloomFilter.Builder bloomFilter)
throws IOException {
- return new SortLookupStoreWriter(file, blockSize, bloomFilter,
compressionFactory);
+ Path filePath = new Path(file.getAbsolutePath());
+ PositionOutputStream out =
LocalFileIO.INSTANCE.newOutputStream(filePath, true);
+ return new SortLookupStoreWriter(out, blockSize, bloomFilter,
compressionFactory);
}
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/sst/Footer.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
similarity index 83%
rename from paimon-common/src/main/java/org/apache/paimon/sst/Footer.java
rename to
paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
index 7855d18a94..9f4272096c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/Footer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
@@ -16,28 +16,29 @@
* limitations under the License.
*/
-package org.apache.paimon.sst;
+package org.apache.paimon.lookup.sort;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.memory.MemorySliceInput;
import org.apache.paimon.memory.MemorySliceOutput;
+import org.apache.paimon.sst.BlockHandle;
+import org.apache.paimon.sst.BloomFilterHandle;
import javax.annotation.Nullable;
-import java.io.IOException;
-
import static org.apache.paimon.sst.SstFileWriter.MAGIC_NUMBER;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Footer for a sorted file. */
-public class Footer {
+public class SortLookupStoreFooter {
public static final int ENCODED_LENGTH = 36;
@Nullable private final BloomFilterHandle bloomFilterHandle;
private final BlockHandle indexBlockHandle;
- Footer(@Nullable BloomFilterHandle bloomFilterHandle, BlockHandle
indexBlockHandle) {
+ public SortLookupStoreFooter(
+ @Nullable BloomFilterHandle bloomFilterHandle, BlockHandle
indexBlockHandle) {
this.bloomFilterHandle = bloomFilterHandle;
this.indexBlockHandle = indexBlockHandle;
}
@@ -51,7 +52,7 @@ public class Footer {
return indexBlockHandle;
}
- public static Footer readFooter(MemorySliceInput sliceInput) throws
IOException {
+ public static SortLookupStoreFooter readFooter(MemorySliceInput
sliceInput) {
// read bloom filter and index handles
@Nullable
BloomFilterHandle bloomFilterHandle =
@@ -71,16 +72,16 @@ public class Footer {
int magicNumber = sliceInput.readInt();
checkArgument(magicNumber == MAGIC_NUMBER, "File is not a table (bad
magic number)");
- return new Footer(bloomFilterHandle, indexBlockHandle);
+ return new SortLookupStoreFooter(bloomFilterHandle, indexBlockHandle);
}
- public static MemorySlice writeFooter(Footer footer) {
+ public static MemorySlice writeFooter(SortLookupStoreFooter footer) {
MemorySliceOutput output = new MemorySliceOutput(ENCODED_LENGTH);
writeFooter(footer, output);
return output.toSlice();
}
- public static void writeFooter(Footer footer, MemorySliceOutput
sliceOutput) {
+ public static void writeFooter(SortLookupStoreFooter footer,
MemorySliceOutput sliceOutput) {
// write bloom filter and index handles
if (footer.bloomFilterHandle == null) {
sliceOutput.writeLong(0);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
index ef862e6a3a..63b060fee9 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
@@ -20,15 +20,16 @@ package org.apache.paimon.lookup.sort;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.LookupStoreReader;
+import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.sst.BlockCache;
import org.apache.paimon.sst.SstFileReader;
+import org.apache.paimon.utils.FileBasedBloomFilter;
import javax.annotation.Nullable;
-import java.io.File;
import java.io.IOException;
import java.util.Comparator;
@@ -39,11 +40,24 @@ public class SortLookupStoreReader implements
LookupStoreReader {
private final SstFileReader reader;
public SortLookupStoreReader(
- Comparator<MemorySlice> comparator, File file, CacheManager
cacheManager)
- throws IOException {
- Path filePath = new Path(file.getAbsolutePath());
- this.input = LocalFileIO.INSTANCE.newInputStream(filePath);
- this.reader = new SstFileReader(comparator, file.length(), filePath,
input, cacheManager);
+ Comparator<MemorySlice> comparator,
+ Path filePath,
+ long fileLen,
+ SeekableInputStream input,
+ CacheManager cacheManager) {
+ this.input = input;
+ BlockCache blockCache = new BlockCache(filePath, input, cacheManager);
+ int footerLen = SortLookupStoreFooter.ENCODED_LENGTH;
+ MemorySegment footerData =
+ blockCache.getBlock(fileLen - footerLen, footerLen, b -> b,
true);
+ SortLookupStoreFooter footer =
+
SortLookupStoreFooter.readFooter(MemorySlice.wrap(footerData).toInput());
+ FileBasedBloomFilter bloomFilter =
+ FileBasedBloomFilter.create(
+ input, filePath, cacheManager,
footer.getBloomFilterHandle());
+ this.reader =
+ new SstFileReader(
+ comparator, blockCache, footer.getIndexBlockHandle(),
bloomFilter);
}
@Nullable
@@ -52,6 +66,10 @@ public class SortLookupStoreReader implements
LookupStoreReader {
return reader.lookup(key);
}
+ public SstFileReader.SstFileIterator createIterator() {
+ return reader.createIterator();
+ }
+
@Override
public void close() throws IOException {
reader.close();
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
index e8ed55b96b..e5fab5fc5c 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
@@ -19,32 +19,49 @@
package org.apache.paimon.lookup.sort;
import org.apache.paimon.compression.BlockCompressionFactory;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
-import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.lookup.LookupStoreWriter;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.sst.BlockHandle;
+import org.apache.paimon.sst.BloomFilterHandle;
import org.apache.paimon.sst.SstFileWriter;
import org.apache.paimon.utils.BloomFilter;
import javax.annotation.Nullable;
-import java.io.File;
import java.io.IOException;
-/** A {@link LookupStoreWriter} backed by an {@link SstFileWriter}. */
+/**
+ * A {@link LookupStoreWriter} backed by an {@link SstFileWriter}. The SST
File layout is as below:
+ * (For layouts of each block type, please refer to corresponding classes)
+ *
+ * <pre>
+ * +-----------------------------------+------+
+ * | Footer | |
+ * +-----------------------------------+ |
+ * | Index Block | +--> Loaded on open
+ * +-----------------------------------+ |
+ * | Bloom Filter Block | |
+ * +-----------------------------------+------+
+ * | Data Block | |
+ * +-----------------------------------+ |
+ * | ...... | +--> Loaded on requested
+ * +-----------------------------------+ |
+ * | Data Block | |
+ * +-----------------------------------+------+
+ * </pre>
+ */
public class SortLookupStoreWriter implements LookupStoreWriter {
private final SstFileWriter writer;
private final PositionOutputStream out;
public SortLookupStoreWriter(
- File file,
+ PositionOutputStream out,
int blockSize,
@Nullable BloomFilter.Builder bloomFilter,
- BlockCompressionFactory compressionFactory)
- throws IOException {
- Path filePath = new Path(file.getAbsolutePath());
- this.out = LocalFileIO.INSTANCE.newOutputStream(filePath, true);
+ BlockCompressionFactory compressionFactory) {
+ this.out = out;
this.writer = new SstFileWriter(out, blockSize, bloomFilter,
compressionFactory);
}
@@ -55,7 +72,13 @@ public class SortLookupStoreWriter implements
LookupStoreWriter {
@Override
public void close() throws IOException {
- writer.close();
+ writer.flush();
+ BloomFilterHandle bloomFilterHandle = writer.writeBloomFilter();
+ BlockHandle indexBlockHandle = writer.writeIndexBlock();
+ SortLookupStoreFooter footer =
+ new SortLookupStoreFooter(bloomFilterHandle, indexBlockHandle);
+ MemorySlice footerEncoding = SortLookupStoreFooter.writeFooter(footer);
+ writer.writeSlice(footerEncoding);
out.close();
}
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java
index 60d4c5929c..0ced616cf3 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java
@@ -24,12 +24,13 @@ import org.apache.paimon.memory.MemorySliceOutput;
/** Handle for a block. */
public class BlockHandle {
+
public static final int MAX_ENCODED_LENGTH = 9 + 5;
private final long offset;
private final int size;
- BlockHandle(long offset, int size) {
+ public BlockHandle(long offset, int size) {
this.offset = offset;
this.size = size;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
b/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
index dbcb962481..05e9f684bc 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
@@ -27,7 +27,7 @@ public class BloomFilterHandle {
private final int size;
private final long expectedEntries;
- BloomFilterHandle(long offset, int size, long expectedEntries) {
+ public BloomFilterHandle(long offset, int size, long expectedEntries) {
this.offset = offset;
this.size = size;
this.expectedEntries = expectedEntries;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
index 26148d0184..788e926dc7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
@@ -20,9 +20,6 @@ package org.apache.paimon.sst;
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.BlockDecompressor;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.memory.MemorySliceInput;
@@ -48,39 +45,19 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
public class SstFileReader implements Closeable {
private final Comparator<MemorySlice> comparator;
- private final Path filePath;
private final BlockCache blockCache;
private final BlockReader indexBlock;
@Nullable private final FileBasedBloomFilter bloomFilter;
public SstFileReader(
Comparator<MemorySlice> comparator,
- long fileSize,
- Path filePath,
- SeekableInputStream input,
- CacheManager cacheManager)
- throws IOException {
+ BlockCache blockCache,
+ BlockHandle indexBlockHandle,
+ @Nullable FileBasedBloomFilter bloomFilter) {
this.comparator = comparator;
- this.filePath = filePath;
- this.blockCache = new BlockCache(filePath, input, cacheManager);
- MemorySegment footerData =
- blockCache.getBlock(
- fileSize - Footer.ENCODED_LENGTH,
Footer.ENCODED_LENGTH, b -> b, true);
- Footer footer =
Footer.readFooter(MemorySlice.wrap(footerData).toInput());
- this.indexBlock = readBlock(footer.getIndexBlockHandle(), true);
- BloomFilterHandle handle = footer.getBloomFilterHandle();
- if (handle == null) {
- this.bloomFilter = null;
- } else {
- this.bloomFilter =
- new FileBasedBloomFilter(
- input,
- filePath,
- cacheManager,
- handle.expectedEntries(),
- handle.offset(),
- handle.size());
- }
+ this.blockCache = blockCache;
+ this.indexBlock = readBlock(indexBlockHandle, true);
+ this.bloomFilter = bloomFilter;
}
/**
@@ -154,8 +131,8 @@ public class SstFileReader implements Closeable {
checkArgument(
blockTrailer.getCrc32c() == crc32cCode,
String.format(
- "Expected CRC32C(%d) but found CRC32C(%d) for
file(%s)",
- blockTrailer.getCrc32c(), crc32cCode, filePath));
+ "Expected CRC32C(%d) but found CRC32C(%d)",
+ blockTrailer.getCrc32c(), crc32cCode));
// decompress data
BlockCompressionFactory compressionFactory =
@@ -184,11 +161,11 @@ public class SstFileReader implements Closeable {
bloomFilter.close();
}
blockCache.close();
- // do not need to close input, since it will be closed by outer classes
}
/** An Iterator for range queries. */
public class SstFileIterator {
+
private final BlockIterator indexIterator;
private @Nullable BlockIterator seekedDataBlock = null;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
index 603899378b..1a8e2d8bda 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
@@ -22,6 +22,7 @@ import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.BlockCompressionType;
import org.apache.paimon.compression.BlockCompressor;
import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.lookup.sort.SortLookupStoreFooter;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.options.MemorySize;
@@ -33,7 +34,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.io.Closeable;
import java.io.IOException;
import static org.apache.paimon.memory.MemorySegmentUtils.allocateReuseBytes;
@@ -43,26 +43,9 @@ import static
org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
/**
* The writer for writing SST Files. SST Files are row-oriented and designed
to serve frequent point
- * queries and range queries by key. The SST File layout is as below: (For
layouts of each block
- * type, please refer to corresponding classes)
- *
- * <pre>
- * +-----------------------------------+------+
- * | Footer | |
- * +-----------------------------------+ |
- * | Index Block | +--> Loaded on open
- * +-----------------------------------+ |
- * | Bloom Filter Block | |
- * +-----------------------------------+------+
- * | Data Block | |
- * +-----------------------------------+ |
- * | ...... | +--> Loaded on requested
- * +-----------------------------------+ |
- * | Data Block | |
- * +-----------------------------------+------+
- * </pre>
+ * queries and range queries by key.
*/
-public class SstFileWriter implements Closeable {
+public class SstFileWriter {
private static final Logger LOG =
LoggerFactory.getLogger(SstFileWriter.class.getName());
@@ -77,7 +60,6 @@ public class SstFileWriter implements Closeable {
@Nullable private final BlockCompressor blockCompressor;
private byte[] lastKey;
- private long position;
private long recordCount;
private long totalUncompressedSize;
@@ -127,7 +109,7 @@ public class SstFileWriter implements Closeable {
recordCount++;
}
- private void flush() throws IOException {
+ public void flush() throws IOException {
if (dataBlockWriter.size() == 0) {
return;
}
@@ -173,7 +155,7 @@ public class SstFileWriter implements Closeable {
MemorySlice trailer = BlockTrailer.writeBlockTrailer(blockTrailer);
// create a handle to this block
- BlockHandle blockHandle = new BlockHandle(position, block.length());
+ BlockHandle blockHandle = new BlockHandle(out.getPos(),
block.length());
// write data
writeSlice(block);
@@ -187,7 +169,28 @@ public class SstFileWriter implements Closeable {
return blockHandle;
}
- @Override
+ @Nullable
+ public BloomFilterHandle writeBloomFilter() throws IOException {
+ if (bloomFilter == null) {
+ return null;
+ }
+ MemorySegment buffer = bloomFilter.getBuffer();
+ BloomFilterHandle bloomFilterHandle =
+ new BloomFilterHandle(out.getPos(), buffer.size(),
bloomFilter.expectedEntries());
+ writeSlice(MemorySlice.wrap(buffer));
+ LOG.info("Bloom filter size: {} bytes",
bloomFilter.getBuffer().size());
+ return bloomFilterHandle;
+ }
+
+ @Nullable
+ public BlockHandle writeIndexBlock() throws IOException {
+ BlockHandle indexBlock = writeBlock(indexBlockWriter);
+ LOG.info("Number of record: {}", recordCount);
+ LOG.info("totalUncompressedSize: {}",
MemorySize.ofBytes(totalUncompressedSize));
+ LOG.info("totalCompressedSize: {}",
MemorySize.ofBytes(totalCompressedSize));
+ return indexBlock;
+ }
+
public void close() throws IOException {
// flush current data block
flush();
@@ -199,7 +202,8 @@ public class SstFileWriter implements Closeable {
if (bloomFilter != null) {
MemorySegment buffer = bloomFilter.getBuffer();
bloomFilterHandle =
- new BloomFilterHandle(position, buffer.size(),
bloomFilter.expectedEntries());
+ new BloomFilterHandle(
+ out.getPos(), buffer.size(),
bloomFilter.expectedEntries());
writeSlice(MemorySlice.wrap(buffer));
LOG.info("Bloom filter size: {} bytes",
bloomFilter.getBuffer().size());
}
@@ -208,8 +212,9 @@ public class SstFileWriter implements Closeable {
BlockHandle indexBlockHandle = writeBlock(indexBlockWriter);
// write footer
- Footer footer = new Footer(bloomFilterHandle, indexBlockHandle);
- MemorySlice footerEncoding = Footer.writeFooter(footer);
+ SortLookupStoreFooter footer =
+ new SortLookupStoreFooter(bloomFilterHandle, indexBlockHandle);
+ MemorySlice footerEncoding = SortLookupStoreFooter.writeFooter(footer);
writeSlice(footerEncoding);
// do not need to close outputStream, since it will be closed by outer
classes
@@ -218,8 +223,7 @@ public class SstFileWriter implements Closeable {
LOG.info("totalCompressedSize: {}",
MemorySize.ofBytes(totalCompressedSize));
}
- private void writeSlice(MemorySlice slice) throws IOException {
+ public void writeSlice(MemorySlice slice) throws IOException {
out.write(slice.getHeapMemory(), slice.offset(), slice.length());
- position += slice.length();
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
index f05bc6bcca..d6feca60ac 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
@@ -26,6 +26,9 @@ import org.apache.paimon.io.cache.CacheKey;
import org.apache.paimon.io.cache.CacheKey.PositionCacheKey;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.sst.BloomFilterHandle;
+
+import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
@@ -58,6 +61,24 @@ public class FileBasedBloomFilter implements Closeable {
this.cacheKey = CacheKey.forPosition(filePath, readOffset, readLength,
true);
}
+ @Nullable
+ public static FileBasedBloomFilter create(
+ SeekableInputStream input,
+ Path filePath,
+ CacheManager cacheManager,
+ @Nullable BloomFilterHandle bloomFilterHandle) {
+ if (bloomFilterHandle == null) {
+ return null;
+ }
+ return new FileBasedBloomFilter(
+ input,
+ filePath,
+ cacheManager,
+ bloomFilterHandle.expectedEntries(),
+ bloomFilterHandle.offset(),
+ bloomFilterHandle.size());
+ }
+
public boolean testHash(int hash) {
accessCount++;
// we should refresh cache in LRU, but we cannot refresh everytime, it
is costly.
diff --git a/paimon-common/src/test/java/org/apache/paimon/sst/SstFileTest.java
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreTest.java
similarity index 87%
rename from paimon-common/src/test/java/org/apache/paimon/sst/SstFileTest.java
rename to
paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreTest.java
index 7e5642d333..235954e33d 100644
--- a/paimon-common/src/test/java/org/apache/paimon/sst/SstFileTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.sst;
+package org.apache.paimon.lookup.sort;
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.CompressOptions;
@@ -29,6 +29,10 @@ import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.memory.MemorySliceOutput;
import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.sst.BlockEntry;
+import org.apache.paimon.sst.BlockIterator;
+import org.apache.paimon.sst.SstFileReader;
+import org.apache.paimon.sst.SstFileWriter;
import
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
import org.apache.paimon.testutils.junit.parameterized.Parameters;
import org.apache.paimon.utils.BloomFilter;
@@ -41,6 +45,7 @@ import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
@@ -50,8 +55,9 @@ import java.util.UUID;
/** Test for {@link SstFileReader} and {@link SstFileWriter}. */
@ExtendWith(ParameterizedTestExtension.class)
-public class SstFileTest {
- private static final Logger LOG =
LoggerFactory.getLogger(SstFileTest.class);
+public class SortLookupStoreTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SortLookupStoreTest.class);
// 256 records per block
private static final int BLOCK_SIZE = (10) * 256;
@@ -63,9 +69,9 @@ public class SstFileTest {
private FileIO fileIO;
private Path file;
- private Path parent;
+ private File localFile;
- public SstFileTest(List<Object> var) {
+ public SortLookupStoreTest(List<Object> var) {
this.bloomFilterEnabled = (Boolean) var.get(0);
this.compress = new CompressOptions((String) var.get(1), 1);
}
@@ -85,8 +91,8 @@ public class SstFileTest {
@BeforeEach
public void beforeEach() {
this.fileIO = LocalFileIO.create();
- this.parent = new Path(tempPath.toUri());
this.file = new Path(new Path(tempPath.toUri()),
UUID.randomUUID().toString());
+ this.localFile = new File(file.toUri().getPath());
}
@TestTemplate
@@ -95,16 +101,18 @@ public class SstFileTest {
innerTestLookup();
}
+ private SortLookupStoreReader newReader() throws IOException {
+ SeekableInputStream input = LocalFileIO.INSTANCE.newInputStream(file);
+ return new SortLookupStoreReader(
+ Comparator.comparingInt(slice -> slice.readInt(0)),
+ file,
+ localFile.length(),
+ input,
+ CACHE_MANAGER);
+ }
+
private void innerTestLookup() throws Exception {
- long fileSize = fileIO.getFileSize(file);
- try (SeekableInputStream inputStream = fileIO.newInputStream(file);
- SstFileReader reader =
- new SstFileReader(
- Comparator.comparingInt(slice ->
slice.readInt(0)),
- fileSize,
- file,
- inputStream,
- CACHE_MANAGER); ) {
+ try (SortLookupStoreReader reader = newReader()) {
Random random = new Random();
MemorySliceOutput keyOut = new MemorySliceOutput(4);
@@ -162,15 +170,7 @@ public class SstFileTest {
int recordNum = 20000;
writeData(recordNum, bloomFilterEnabled);
- long fileSize = fileIO.getFileSize(file);
- try (SeekableInputStream inputStream = fileIO.newInputStream(file);
- SstFileReader reader =
- new SstFileReader(
- Comparator.comparingInt(slice ->
slice.readInt(0)),
- fileSize,
- file,
- inputStream,
- CACHE_MANAGER); ) {
+ try (SortLookupStoreReader reader = newReader()) {
SstFileReader.SstFileIterator fileIterator =
reader.createIterator();
MemorySliceOutput keyOut = new MemorySliceOutput(4);
@@ -230,7 +230,8 @@ public class SstFileTest {
iterator.seekTo(keyOut.toSlice().getHeapMemory());
}
- private void interleaveLookup(SstFileReader reader, MemorySliceOutput
keyOut) throws Exception {
+ private void interleaveLookup(SortLookupStoreReader reader,
MemorySliceOutput keyOut)
+ throws Exception {
keyOut.reset();
keyOut.writeInt(0);
reader.lookup(keyOut.toSlice().getHeapMemory());
@@ -243,8 +244,8 @@ public class SstFileTest {
}
BlockCompressionFactory compressionFactory =
BlockCompressionFactory.create(compress);
try (PositionOutputStream outputStream = fileIO.newOutputStream(file,
true);
- SstFileWriter writer =
- new SstFileWriter(
+ SortLookupStoreWriter writer =
+ new SortLookupStoreWriter(
outputStream, BLOCK_SIZE, bloomFilter,
compressionFactory); ) {
MemorySliceOutput keyOut = new MemorySliceOutput(4);
MemorySliceOutput valueOut = new MemorySliceOutput(4);
diff --git
a/paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
b/paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
index 06bfa71690..f17c436518 100644
--- a/paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
@@ -30,6 +30,7 @@ import java.util.Map;
/** Test for {@link BlockIterator}. */
public class BlockIteratorTest {
+
private static final int ROW_NUM = 10_000;
private static final Comparator<MemorySlice> COMPARATOR =
Comparator.comparingInt(slice -> slice.readInt(0));