This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 039046a0d4 [core] Extract decompressBlock method in 
SortLookupStoreReader
039046a0d4 is described below

commit 039046a0d4d4aa4195f9187b2d0214f277316ce8
Author: Jingsong <[email protected]>
AuthorDate: Mon Dec 2 22:52:10 2024 +0800

    [core] Extract decompressBlock method in SortLookupStoreReader
---
 .../paimon/lookup/sort/SortLookupStoreReader.java  | 65 +++++++++++-----------
 1 file changed, 32 insertions(+), 33 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
index 39997888ce..6dbfe130e3 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
@@ -106,7 +106,7 @@ public class SortLookupStoreReader implements 
LookupStoreReader {
         return null;
     }
 
-    private BlockIterator getNextBlock() throws IOException {
+    private BlockIterator getNextBlock() {
         // index block handle, point to the key, value position.
         MemorySlice blockHandle = indexBlockIterator.next().getValue();
         BlockReader dataBlock =
@@ -134,42 +134,41 @@ public class SortLookupStoreReader implements 
LookupStoreReader {
                 blockCache.getBlock(
                         blockHandle.offset(),
                         blockHandle.size(),
-                        bytes -> {
-                            MemorySegment block = MemorySegment.wrap(bytes);
-                            int crc32cCode = crc32c(block, 
blockTrailer.getCompressionType());
-                            checkArgument(
-                                    blockTrailer.getCrc32c() == crc32cCode,
-                                    String.format(
-                                            "Expected CRC32C(%d) but found 
CRC32C(%d) for file(%s)",
-                                            blockTrailer.getCrc32c(), 
crc32cCode, filePath));
-
-                            // decompress data
-                            BlockCompressionFactory compressionFactory =
-                                    BlockCompressionFactory.create(
-                                            blockTrailer.getCompressionType());
-                            if (compressionFactory == null) {
-                                return bytes;
-                            } else {
-                                MemorySliceInput compressedInput =
-                                        MemorySlice.wrap(block).toInput();
-                                byte[] uncompressed = new 
byte[compressedInput.readVarLenInt()];
-                                BlockDecompressor decompressor =
-                                        compressionFactory.getDecompressor();
-                                int uncompressedLength =
-                                        decompressor.decompress(
-                                                block.getHeapMemory(),
-                                                compressedInput.position(),
-                                                compressedInput.available(),
-                                                uncompressed,
-                                                0);
-                                checkArgument(uncompressedLength == 
uncompressed.length);
-                                return uncompressed;
-                            }
-                        },
+                        bytes -> decompressBlock(bytes, blockTrailer),
                         index);
         return new BlockReader(MemorySlice.wrap(unCompressedBlock), 
comparator);
     }
 
+    private byte[] decompressBlock(byte[] compressedBytes, BlockTrailer 
blockTrailer) {
+        MemorySegment compressed = MemorySegment.wrap(compressedBytes);
+        int crc32cCode = crc32c(compressed, blockTrailer.getCompressionType());
+        checkArgument(
+                blockTrailer.getCrc32c() == crc32cCode,
+                String.format(
+                        "Expected CRC32C(%d) but found CRC32C(%d) for 
file(%s)",
+                        blockTrailer.getCrc32c(), crc32cCode, filePath));
+
+        // decompress data
+        BlockCompressionFactory compressionFactory =
+                
BlockCompressionFactory.create(blockTrailer.getCompressionType());
+        if (compressionFactory == null) {
+            return compressedBytes;
+        } else {
+            MemorySliceInput compressedInput = 
MemorySlice.wrap(compressed).toInput();
+            byte[] uncompressed = new byte[compressedInput.readVarLenInt()];
+            BlockDecompressor decompressor = 
compressionFactory.getDecompressor();
+            int uncompressedLength =
+                    decompressor.decompress(
+                            compressed.getHeapMemory(),
+                            compressedInput.position(),
+                            compressedInput.available(),
+                            uncompressed,
+                            0);
+            checkArgument(uncompressedLength == uncompressed.length);
+            return uncompressed;
+        }
+    }
+
     @Override
     public void close() throws IOException {
         if (bloomFilter != null) {

Reply via email to