This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c869d9022e [core] Refactor Index Reader in BlockReader (#6865)
c869d9022e is described below
commit c869d9022ec9f98fa341932b579212a2a1dd0dbe
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Dec 23 13:21:15 2025 +0800
[core] Refactor Index Reader in BlockReader (#6865)
---
.../java/org/apache/paimon/sst/BlockIterator.java | 41 ++++++-----------
.../java/org/apache/paimon/sst/BlockReader.java | 51 +++++++++++++++-------
.../java/org/apache/paimon/sst/SstFileReader.java | 11 ++---
.../org/apache/paimon/sst/BlockIteratorTest.java | 2 +-
4 files changed, 56 insertions(+), 49 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/sst/BlockIterator.java
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockIterator.java
index 65913957e3..f61e7d8d0a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/BlockIterator.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockIterator.java
@@ -21,33 +21,25 @@ package org.apache.paimon.sst;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.memory.MemorySliceInput;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
-import static java.util.Objects.requireNonNull;
-
/** An {@link Iterator} for a block. */
-public abstract class BlockIterator implements Iterator<Map.Entry<MemorySlice,
MemorySlice>> {
-
- protected final MemorySliceInput data;
-
- private final int recordCount;
- private final Comparator<MemorySlice> comparator;
+public class BlockIterator implements Iterator<Map.Entry<MemorySlice,
MemorySlice>> {
+ private final BlockReader reader;
+ private final MemorySliceInput input;
private BlockEntry polled;
- public BlockIterator(
- MemorySliceInput data, int recordCount, Comparator<MemorySlice>
comparator) {
- this.data = data;
- this.recordCount = recordCount;
- this.comparator = comparator;
+ public BlockIterator(BlockReader reader) {
+ this.reader = reader;
+ this.input = reader.blockInput();
}
@Override
public boolean hasNext() {
- return polled != null || data.isReadable();
+ return polled != null || input.isReadable();
}
@Override
@@ -72,14 +64,14 @@ public abstract class BlockIterator implements
Iterator<Map.Entry<MemorySlice, M
public boolean seekTo(MemorySlice targetKey) {
int left = 0;
- int right = recordCount - 1;
+ int right = reader.recordCount() - 1;
while (left <= right) {
int mid = left + (right - left) / 2;
- seekTo(mid);
+ input.setPosition(reader.seekTo(mid));
BlockEntry midEntry = readEntry();
- int compare = comparator.compare(midEntry.getKey(), targetKey);
+ int compare = reader.comparator().compare(midEntry.getKey(),
targetKey);
if (compare == 0) {
polled = midEntry;
@@ -96,18 +88,13 @@ public abstract class BlockIterator implements
Iterator<Map.Entry<MemorySlice, M
return false;
}
- /** Seek to the specified record position of current block. */
- public abstract void seekTo(int recordPosition);
-
private BlockEntry readEntry() {
- requireNonNull(data, "data is null");
-
int keyLength;
- keyLength = data.readVarLenInt();
- MemorySlice key = data.readSlice(keyLength);
+ keyLength = input.readVarLenInt();
+ MemorySlice key = input.readSlice(keyLength);
- int valueLength = data.readVarLenInt();
- MemorySlice value = data.readSlice(valueLength);
+ int valueLength = input.readVarLenInt();
+ MemorySlice value = input.readSlice(valueLength);
return new BlockEntry(key, value);
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/sst/BlockReader.java
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockReader.java
index d7596bafa4..caee5884d5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/BlockReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockReader.java
@@ -19,69 +19,88 @@
package org.apache.paimon.sst;
import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
import java.util.Comparator;
import static org.apache.paimon.sst.BlockAlignedType.ALIGNED;
/** Reader for a block. */
-public class BlockReader {
+public abstract class BlockReader {
+
private final MemorySlice block;
+ private final int recordCount;
private final Comparator<MemorySlice> comparator;
- public BlockReader(MemorySlice block, Comparator<MemorySlice> comparator) {
+ private BlockReader(MemorySlice block, int recordCount,
Comparator<MemorySlice> comparator) {
this.block = block;
+ this.recordCount = recordCount;
this.comparator = comparator;
}
- public long size() {
- return block.length();
+ public MemorySliceInput blockInput() {
+ return block.toInput();
+ }
+
+ public int recordCount() {
+ return recordCount;
+ }
+
+ public Comparator<MemorySlice> comparator() {
+ return comparator;
}
public BlockIterator iterator() {
+ return new BlockIterator(this);
+ }
+
+ /** Seek to slice position from record position. */
+ public abstract int seekTo(int recordPosition);
+
+ public static BlockReader create(MemorySlice block,
Comparator<MemorySlice> comparator) {
BlockAlignedType alignedType =
BlockAlignedType.fromByte(block.readByte(block.length() - 1));
int intValue = block.readInt(block.length() - 5);
if (alignedType == ALIGNED) {
- return new AlignedIterator(block.slice(0, block.length() - 5),
intValue, comparator);
+ return new AlignedBlockReader(block.slice(0, block.length() - 5),
intValue, comparator);
} else {
int indexLength = intValue * 4;
int indexOffset = block.length() - 5 - indexLength;
MemorySlice data = block.slice(0, indexOffset);
MemorySlice index = block.slice(indexOffset, indexLength);
- return new UnalignedIterator(data, index, comparator);
+ return new UnalignedBlockReader(data, index, comparator);
}
}
- private static class AlignedIterator extends BlockIterator {
+ private static class AlignedBlockReader extends BlockReader {
private final int recordSize;
- public AlignedIterator(
+ public AlignedBlockReader(
MemorySlice data, int recordSize, Comparator<MemorySlice>
comparator) {
- super(data.toInput(), data.length() / recordSize, comparator);
+ super(data, data.length() / recordSize, comparator);
this.recordSize = recordSize;
}
@Override
- public void seekTo(int recordPosition) {
- data.setPosition(recordPosition * recordSize);
+ public int seekTo(int recordPosition) {
+ return recordPosition * recordSize;
}
}
- private static class UnalignedIterator extends BlockIterator {
+ private static class UnalignedBlockReader extends BlockReader {
private final MemorySlice index;
- public UnalignedIterator(
+ public UnalignedBlockReader(
MemorySlice data, MemorySlice index, Comparator<MemorySlice>
comparator) {
- super(data.toInput(), index.length() / 4, comparator);
+ super(data, index.length() / 4, comparator);
this.index = index;
}
@Override
- public void seekTo(int recordPosition) {
- data.setPosition(index.readInt(recordPosition * 4));
+ public int seekTo(int recordPosition) {
+ return index.readInt(recordPosition * 4);
}
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
index 9184a5d9dd..4571bf644c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
@@ -48,7 +48,7 @@ public class SstFileReader implements Closeable {
private final Comparator<MemorySlice> comparator;
private final Path filePath;
private final BlockCache blockCache;
- private final BlockIterator indexBlockIterator;
+ private final BlockReader indexBlock;
@Nullable private final FileBasedBloomFilter bloomFilter;
public SstFileReader(
@@ -65,7 +65,7 @@ public class SstFileReader implements Closeable {
blockCache.getBlock(
fileSize - Footer.ENCODED_LENGTH,
Footer.ENCODED_LENGTH, b -> b, true);
Footer footer =
Footer.readFooter(MemorySlice.wrap(footerData).toInput());
- this.indexBlockIterator = readBlock(footer.getIndexBlockHandle(),
true).iterator();
+ this.indexBlock = readBlock(footer.getIndexBlockHandle(), true);
BloomFilterHandle handle = footer.getBloomFilterHandle();
if (handle == null) {
this.bloomFilter = null;
@@ -95,12 +95,13 @@ public class SstFileReader implements Closeable {
MemorySlice keySlice = MemorySlice.wrap(key);
// seek the index to the block containing the key
+ BlockIterator indexBlockIterator = indexBlock.iterator();
indexBlockIterator.seekTo(keySlice);
// if indexIterator does not have a next, it means the key does not
exist in this iterator
if (indexBlockIterator.hasNext()) {
// seek the current iterator to the key
- BlockIterator current = getNextBlock();
+ BlockIterator current = getNextBlock(indexBlockIterator);
if (current.seekTo(keySlice)) {
return current.next().getValue().copyBytes();
}
@@ -108,7 +109,7 @@ public class SstFileReader implements Closeable {
return null;
}
- private BlockIterator getNextBlock() {
+ private BlockIterator getNextBlock(BlockIterator indexBlockIterator) {
// index block handle, point to the key, value position.
MemorySlice blockHandle = indexBlockIterator.next().getValue();
BlockReader dataBlock =
@@ -138,7 +139,7 @@ public class SstFileReader implements Closeable {
blockHandle.size(),
bytes -> decompressBlock(bytes, blockTrailer),
index);
- return new BlockReader(MemorySlice.wrap(unCompressedBlock),
comparator);
+ return BlockReader.create(MemorySlice.wrap(unCompressedBlock),
comparator);
}
private byte[] decompressBlock(byte[] compressedBytes, BlockTrailer
blockTrailer) {
diff --git
a/paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
b/paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
index 004b920a4e..06bfa71690 100644
--- a/paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java
@@ -46,7 +46,7 @@ public class BlockIteratorTest {
public void innerTest(boolean aligned) throws IOException {
MemorySlice data = writeBlock(aligned);
- BlockIterator iterator = new BlockReader(data, COMPARATOR).iterator();
+ BlockIterator iterator = BlockReader.create(data,
COMPARATOR).iterator();
// 1. test for normal cases:
final int step = 3;