This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 039046a0d4 [core] Extract decompressBlock method in
SortLookupStoreReader
039046a0d4 is described below
commit 039046a0d4d4aa4195f9187b2d0214f277316ce8
Author: Jingsong <[email protected]>
AuthorDate: Mon Dec 2 22:52:10 2024 +0800
[core] Extract decompressBlock method in SortLookupStoreReader
---
.../paimon/lookup/sort/SortLookupStoreReader.java | 65 +++++++++++-----------
1 file changed, 32 insertions(+), 33 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
index 39997888ce..6dbfe130e3 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
@@ -106,7 +106,7 @@ public class SortLookupStoreReader implements
LookupStoreReader {
return null;
}
- private BlockIterator getNextBlock() throws IOException {
+ private BlockIterator getNextBlock() {
// index block handle, point to the key, value position.
MemorySlice blockHandle = indexBlockIterator.next().getValue();
BlockReader dataBlock =
@@ -134,42 +134,41 @@ public class SortLookupStoreReader implements
LookupStoreReader {
blockCache.getBlock(
blockHandle.offset(),
blockHandle.size(),
- bytes -> {
- MemorySegment block = MemorySegment.wrap(bytes);
- int crc32cCode = crc32c(block,
blockTrailer.getCompressionType());
- checkArgument(
- blockTrailer.getCrc32c() == crc32cCode,
- String.format(
- "Expected CRC32C(%d) but found
CRC32C(%d) for file(%s)",
- blockTrailer.getCrc32c(),
crc32cCode, filePath));
-
- // decompress data
- BlockCompressionFactory compressionFactory =
- BlockCompressionFactory.create(
- blockTrailer.getCompressionType());
- if (compressionFactory == null) {
- return bytes;
- } else {
- MemorySliceInput compressedInput =
- MemorySlice.wrap(block).toInput();
- byte[] uncompressed = new
byte[compressedInput.readVarLenInt()];
- BlockDecompressor decompressor =
- compressionFactory.getDecompressor();
- int uncompressedLength =
- decompressor.decompress(
- block.getHeapMemory(),
- compressedInput.position(),
- compressedInput.available(),
- uncompressed,
- 0);
- checkArgument(uncompressedLength ==
uncompressed.length);
- return uncompressed;
- }
- },
+ bytes -> decompressBlock(bytes, blockTrailer),
index);
return new BlockReader(MemorySlice.wrap(unCompressedBlock),
comparator);
}
+ private byte[] decompressBlock(byte[] compressedBytes, BlockTrailer
blockTrailer) {
+ MemorySegment compressed = MemorySegment.wrap(compressedBytes);
+ int crc32cCode = crc32c(compressed, blockTrailer.getCompressionType());
+ checkArgument(
+ blockTrailer.getCrc32c() == crc32cCode,
+ String.format(
+ "Expected CRC32C(%d) but found CRC32C(%d) for
file(%s)",
+ blockTrailer.getCrc32c(), crc32cCode, filePath));
+
+ // decompress data
+ BlockCompressionFactory compressionFactory =
+
BlockCompressionFactory.create(blockTrailer.getCompressionType());
+ if (compressionFactory == null) {
+ return compressedBytes;
+ } else {
+ MemorySliceInput compressedInput =
MemorySlice.wrap(compressed).toInput();
+ byte[] uncompressed = new byte[compressedInput.readVarLenInt()];
+ BlockDecompressor decompressor =
compressionFactory.getDecompressor();
+ int uncompressedLength =
+ decompressor.decompress(
+ compressed.getHeapMemory(),
+ compressedInput.position(),
+ compressedInput.available(),
+ uncompressed,
+ 0);
+ checkArgument(uncompressedLength == uncompressed.length);
+ return uncompressed;
+ }
+ }
+
@Override
public void close() throws IOException {
if (bloomFilter != null) {