This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d33b8711fc [core] Optimization of Parquet Predicate Pushdown 
Capability (#4608)
d33b8711fc is described below

commit d33b8711fc6b4e1f35ba7d85336be4ff3baa956d
Author: aiden.dong <[email protected]>
AuthorDate: Tue Dec 3 13:33:44 2024 +0800

    [core] Optimization of Parquet Predicate Pushdown Capability (#4608)
---
 .../paimon/table/PrimaryKeyFileStoreTableTest.java |  63 +++++++
 .../format/parquet/ParquetReaderFactory.java       |  66 ++++++-
 .../parquet/reader/AbstractColumnReader.java       | 204 +++++++++++++++------
 .../format/parquet/reader/BooleanColumnReader.java |  36 +++-
 .../format/parquet/reader/ByteColumnReader.java    |  39 +++-
 .../format/parquet/reader/BytesColumnReader.java   |  41 ++++-
 .../format/parquet/reader/DoubleColumnReader.java  |  38 +++-
 .../parquet/reader/FixedLenBytesColumnReader.java  |  36 +++-
 .../format/parquet/reader/FloatColumnReader.java   |  38 +++-
 .../format/parquet/reader/IntColumnReader.java     |  39 +++-
 .../format/parquet/reader/LongColumnReader.java    |  39 +++-
 .../format/parquet/reader/NestedColumnReader.java  |   2 +-
 .../reader/NestedPrimitiveColumnReader.java        | 141 +++++++++-----
 .../format/parquet/reader/ParquetReadState.java    | 148 +++++++++++++++
 .../parquet/reader/ParquetSplitReaderUtil.java     |  41 ++---
 .../format/parquet/reader/RunLengthDecoder.java    |  45 +++++
 .../format/parquet/reader/ShortColumnReader.java   |  38 +++-
 .../parquet/reader/TimestampColumnReader.java      |  15 +-
 18 files changed, 898 insertions(+), 171 deletions(-)

diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 46b85223bc..e80b49a0f0 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -84,6 +84,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
@@ -809,6 +810,68 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 
"1|4|500|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
+    @Test
+    public void testDeletionVectorsWithParquetFilter() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(BUCKET, 1);
+                            conf.set(DELETION_VECTORS_ENABLED, true);
+                            conf.set(FILE_FORMAT, "parquet");
+                            conf.set("parquet.block.size", "1048576");
+                            conf.set("parquet.page.size", "1024");
+                        });
+
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+
+        BatchTableWrite write =
+                (BatchTableWrite)
+                        writeBuilder
+                                .newWrite()
+                                .withIOManager(new 
IOManagerImpl(tempDir.toString()));
+
+        for (int i = 0; i < 200000; i++) {
+            write.write(rowData(1, i, i * 100L));
+        }
+
+        List<CommitMessage> messages = write.prepareCommit();
+        BatchTableCommit commit = writeBuilder.newCommit();
+        commit.commit(messages);
+        write =
+                (BatchTableWrite)
+                        writeBuilder
+                                .newWrite()
+                                .withIOManager(new 
IOManagerImpl(tempDir.toString()));
+        for (int i = 180000; i < 200000; i++) {
+            write.write(rowDataWithKind(RowKind.DELETE, 1, i, i * 100L));
+        }
+
+        messages = write.prepareCommit();
+        commit = writeBuilder.newCommit();
+        commit.commit(messages);
+
+        PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        Random random = new Random();
+
+        for (int i = 0; i < 10; i++) {
+            int value = random.nextInt(180000);
+            TableRead read = table.newRead().withFilter(builder.equal(1, 
value)).executeFilter();
+            assertThat(getResult(read, splits, BATCH_ROW_TO_STRING))
+                    .isEqualTo(
+                            Arrays.asList(
+                                    String.format(
+                                            
"%d|%d|%d|binary|varbinary|mapKey:mapVal|multiset",
+                                            1, value, value * 100L)));
+        }
+
+        for (int i = 0; i < 10; i++) {
+            int value = 180000 + random.nextInt(20000);
+            TableRead read = table.newRead().withFilter(builder.equal(1, 
value)).executeFilter();
+            assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)).isEmpty();
+        }
+    }
+
     @Test
     public void testDeletionVectorsWithFileIndexInMeta() throws Exception {
         FileStoreTable table =
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index f0151d6f3d..0c99653120 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -28,6 +28,7 @@ import 
org.apache.paimon.data.columnar.writable.WritableColumnVector;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.parquet.reader.ColumnReader;
 import org.apache.paimon.format.parquet.reader.ParquetDecimalVector;
+import org.apache.paimon.format.parquet.reader.ParquetReadState;
 import org.apache.paimon.format.parquet.reader.ParquetTimestampVector;
 import org.apache.paimon.format.parquet.type.ParquetField;
 import org.apache.paimon.fs.Path;
@@ -130,7 +131,7 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
                 buildFieldsList(projectedType.getFields(), 
projectedType.getFieldNames(), columnIO);
 
         return new ParquetReader(
-                reader, requestedSchema, reader.getRecordCount(), 
poolOfBatches, fields);
+                reader, requestedSchema, reader.getFilteredRecordCount(), 
poolOfBatches, fields);
     }
 
     private void setReadOptions(ParquetReadOptions.Builder builder) {
@@ -336,6 +337,10 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
 
         private long nextRowPosition;
 
+        private ParquetReadState currentRowGroupReadState;
+
+        private long currentRowGroupFirstRowIndex;
+
         /**
          * For each request column, the reader to read this column. This is 
NULL if this column is
          * missing from the file, in which case we populate the attribute with 
NULL.
@@ -359,6 +364,7 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
             this.totalCountLoadedSoFar = 0;
             this.currentRowPosition = 0;
             this.nextRowPosition = 0;
+            this.currentRowGroupFirstRowIndex = 0;
             this.fields = fields;
         }
 
@@ -390,7 +396,8 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
                 currentRowPosition = nextRowPosition;
             }
 
-            int num = (int) Math.min(batchSize, totalCountLoadedSoFar - 
rowsReturned);
+            int num = getBachSize();
+
             for (int i = 0; i < columnReaders.length; ++i) {
                 if (columnReaders[i] == null) {
                     batch.writableVectors[i].fillWithNulls();
@@ -400,13 +407,13 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
                 }
             }
             rowsReturned += num;
-            nextRowPosition = currentRowPosition + num;
+            nextRowPosition = getNextRowPosition(num);
             batch.columnarBatch.setNumRows(num);
             return true;
         }
 
         private void readNextRowGroup() throws IOException {
-            PageReadStore rowGroup = reader.readNextRowGroup();
+            PageReadStore rowGroup = reader.readNextFilteredRowGroup();
             if (rowGroup == null) {
                 throw new IOException(
                         "expecting more rows but reached last block. Read "
@@ -415,6 +422,9 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
                                 + totalRowCount);
             }
 
+            this.currentRowGroupReadState =
+                    new 
ParquetReadState(rowGroup.getRowIndexes().orElse(null));
+
             List<Type> types = requestedSchema.getFields();
             columnReaders = new ColumnReader[types.size()];
             for (int i = 0; i < types.size(); ++i) {
@@ -429,18 +439,62 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
                                     0);
                 }
             }
+
             totalCountLoadedSoFar += rowGroup.getRowCount();
-            if (rowGroup.getRowIndexOffset().isPresent()) {
-                currentRowPosition = rowGroup.getRowIndexOffset().get();
+
+            if (rowGroup.getRowIndexOffset().isPresent()) { // filter
+                currentRowGroupFirstRowIndex = 
rowGroup.getRowIndexOffset().get();
+                long pageIndex = 0;
+                if (!this.currentRowGroupReadState.isMaxRange()) {
+                    pageIndex = 
this.currentRowGroupReadState.currentRangeStart();
+                }
+                currentRowPosition = currentRowGroupFirstRowIndex + pageIndex;
             } else {
                 if (reader.rowGroupsFiltered()) {
                     throw new RuntimeException(
                             "There is a bug, rowIndexOffset must be present 
when row groups are filtered.");
                 }
+                currentRowGroupFirstRowIndex = nextRowPosition;
                 currentRowPosition = nextRowPosition;
             }
         }
 
+        private int getBachSize() throws IOException {
+
+            long rangeBatchSize = Long.MAX_VALUE;
+            if (this.currentRowGroupReadState.isFinished()) {
+                throw new IOException(
+                        "expecting more rows but reached last page block. Read 
"
+                                + rowsReturned
+                                + " out of "
+                                + totalRowCount);
+            } else if (!this.currentRowGroupReadState.isMaxRange()) {
+                long pageIndex = this.currentRowPosition - 
this.currentRowGroupFirstRowIndex;
+                rangeBatchSize = 
this.currentRowGroupReadState.currentRangeEnd() - pageIndex + 1;
+            }
+
+            return (int)
+                    Math.min(
+                            batchSize,
+                            Math.min(rangeBatchSize, totalCountLoadedSoFar - 
rowsReturned));
+        }
+
+        private long getNextRowPosition(int num) {
+            if (this.currentRowGroupReadState.isMaxRange()) {
+                return this.currentRowPosition + num;
+            } else {
+                long pageIndex = this.currentRowPosition - 
this.currentRowGroupFirstRowIndex;
+                long nextIndex = pageIndex + num;
+
+                if (this.currentRowGroupReadState.currentRangeEnd() < 
nextIndex) {
+                    this.currentRowGroupReadState.nextRange();
+                    nextIndex = 
this.currentRowGroupReadState.currentRangeStart();
+                }
+
+                return nextIndex;
+            }
+        }
+
         private ParquetReaderBatch getCachedEntry() throws IOException {
             try {
                 return pool.pollEntry();
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java
index 7e2ab6d5e7..5e3f4a7e6a 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java
@@ -32,6 +32,7 @@ import org.apache.parquet.column.page.DataPage;
 import org.apache.parquet.column.page.DataPageV1;
 import org.apache.parquet.column.page.DataPageV2;
 import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.ParquetDecodingException;
@@ -65,20 +66,16 @@ public abstract class AbstractColumnReader<VECTOR extends 
WritableColumnVector>
 
     protected final ColumnDescriptor descriptor;
 
-    /** Total number of values read. */
-    private long valuesRead;
-
-    /**
-     * value that indicates the end of the current page. That is, if 
valuesRead ==
-     * endOfPageValueCount, we are at the end of the page.
-     */
-    private long endOfPageValueCount;
-
     /** If true, the current page is dictionary encoded. */
     private boolean isCurrentPageDictionaryEncoded;
 
     /** Total values in the current page. */
-    private int pageValueCount;
+    //    private int pageValueCount;
+
+    /**
+     * Helper struct to track intermediate states while reading Parquet pages 
in the column chunk.
+     */
+    private final ParquetReadState readState;
 
     /*
      * Input streams:
@@ -101,12 +98,14 @@ public abstract class AbstractColumnReader<VECTOR extends 
WritableColumnVector>
     /** Dictionary decoder to wrap dictionary ids input stream. */
     private RunLengthDecoder dictionaryIdsDecoder;
 
-    public AbstractColumnReader(ColumnDescriptor descriptor, PageReader 
pageReader)
+    public AbstractColumnReader(ColumnDescriptor descriptor, PageReadStore 
pageReadStore)
             throws IOException {
         this.descriptor = descriptor;
-        this.pageReader = pageReader;
+        this.pageReader = pageReadStore.getPageReader(descriptor);
         this.maxDefLevel = descriptor.getMaxDefinitionLevel();
 
+        this.readState = new 
ParquetReadState(pageReadStore.getRowIndexes().orElse(null));
+
         DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
         if (dictionaryPage != null) {
             try {
@@ -147,56 +146,136 @@ public abstract class AbstractColumnReader<VECTOR 
extends WritableColumnVector>
         if (dictionary != null) {
             dictionaryIds = vector.reserveDictionaryIds(readNumber);
         }
-        while (readNumber > 0) {
+
+        readState.resetForNewBatch(readNumber);
+
+        while (readState.rowsToReadInBatch > 0) {
             // Compute the number of values we want to read in this page.
-            int leftInPage = (int) (endOfPageValueCount - valuesRead);
-            if (leftInPage == 0) {
-                DataPage page = pageReader.readPage();
-                if (page instanceof DataPageV1) {
-                    readPageV1((DataPageV1) page);
-                } else if (page instanceof DataPageV2) {
-                    readPageV2((DataPageV2) page);
-                } else {
-                    throw new RuntimeException("Unsupported page type: " + 
page.getClass());
+            if (readState.valuesToReadInPage == 0) {
+                int pageValueCount = readPage();
+                if (pageValueCount < 0) {
+                    // we've read all the pages; this could happen when we're 
reading a repeated
+                    // list and we
+                    // don't know where the list will end until we've seen all 
the pages.
+                    break;
                 }
-                leftInPage = (int) (endOfPageValueCount - valuesRead);
             }
-            int num = Math.min(readNumber, leftInPage);
-            if (isCurrentPageDictionaryEncoded) {
-                // Read and decode dictionary ids.
-                runLenDecoder.readDictionaryIds(
-                        num, dictionaryIds, vector, rowId, maxDefLevel, 
this.dictionaryIdsDecoder);
-
-                if (vector.hasDictionary() || (rowId == 0 && 
supportLazyDecode())) {
-                    // Column vector supports lazy decoding of dictionary 
values so just set the
-                    // dictionary.
-                    // We can't do this if rowId != 0 AND the column doesn't 
have a dictionary (i.e.
-                    // some
-                    // non-dictionary encoded values have already been added).
-                    vector.setDictionary(new ParquetDictionary(dictionary));
+
+            if (readState.isFinished()) {
+                break;
+            }
+
+            long pageRowId = readState.rowId;
+            int leftInBatch = readState.rowsToReadInBatch;
+            int leftInPage = readState.valuesToReadInPage;
+
+            int readBatch = Math.min(leftInBatch, leftInPage);
+
+            long rangeStart = readState.currentRangeStart();
+            long rangeEnd = readState.currentRangeEnd();
+
+            if (pageRowId < rangeStart) {
+                int toSkip = (int) (rangeStart - pageRowId);
+                if (toSkip >= leftInPage) { // drop page
+                    pageRowId += leftInPage;
+                    leftInPage = 0;
                 } else {
-                    readBatchFromDictionaryIds(rowId, num, vector, 
dictionaryIds);
+                    if (isCurrentPageDictionaryEncoded) {
+                        runLenDecoder.skipDictionaryIds(
+                                toSkip, maxDefLevel, 
this.dictionaryIdsDecoder);
+                        pageRowId += toSkip;
+                        leftInPage -= toSkip;
+                    } else {
+                        skipBatch(toSkip);
+                        pageRowId += toSkip;
+                        leftInPage -= toSkip;
+                    }
                 }
+            } else if (pageRowId > rangeEnd) {
+                readState.nextRange();
             } else {
-                if (vector.hasDictionary() && rowId != 0) {
-                    // This batch already has dictionary encoded values but 
this new page is not.
-                    // The batch
-                    // does not support a mix of dictionary and not so we will 
decode the
-                    // dictionary.
-                    readBatchFromDictionaryIds(0, rowId, vector, 
vector.getDictionaryIds());
+                long start = pageRowId;
+                long end = Math.min(rangeEnd, pageRowId + readBatch - 1);
+                int num = (int) (end - start + 1);
+
+                if (isCurrentPageDictionaryEncoded) {
+                    // Read and decode dictionary ids.
+                    runLenDecoder.readDictionaryIds(
+                            num,
+                            dictionaryIds,
+                            vector,
+                            rowId,
+                            maxDefLevel,
+                            this.dictionaryIdsDecoder);
+
+                    if (vector.hasDictionary() || (rowId == 0 && 
supportLazyDecode())) {
+                        // Column vector supports lazy decoding of dictionary 
values so just set the
+                        // dictionary.
+                        // We can't do this if rowId != 0 AND the column 
doesn't have a dictionary
+                        // (i.e.
+                        // some
+                        // non-dictionary encoded values have already been 
added).
+                        vector.setDictionary(new 
ParquetDictionary(dictionary));
+                    } else {
+                        readBatchFromDictionaryIds(rowId, num, vector, 
dictionaryIds);
+                    }
+                } else {
+                    if (vector.hasDictionary() && rowId != 0) {
+                        // This batch already has dictionary encoded values 
but this new page is
+                        // not.
+                        // The batch
+                        // does not support a mix of dictionary and not so we 
will decode the
+                        // dictionary.
+                        readBatchFromDictionaryIds(0, rowId, vector, 
vector.getDictionaryIds());
+                    }
+                    vector.setDictionary(null);
+                    readBatch(rowId, num, vector);
                 }
-                vector.setDictionary(null);
-                readBatch(rowId, num, vector);
+                leftInBatch -= num;
+                pageRowId += num;
+                leftInPage -= num;
+                rowId += num;
             }
+            readState.rowsToReadInBatch = leftInBatch;
+            readState.valuesToReadInPage = leftInPage;
+            readState.rowId = pageRowId;
+        }
+    }
 
-            valuesRead += num;
-            rowId += num;
-            readNumber -= num;
+    private int readPage() {
+        DataPage page = pageReader.readPage();
+        if (page == null) {
+            return -1;
         }
+        long pageFirstRowIndex = page.getFirstRowIndex().orElse(0L);
+
+        int pageValueCount =
+                page.accept(
+                        new DataPage.Visitor<Integer>() {
+                            @Override
+                            public Integer visit(DataPageV1 dataPageV1) {
+                                try {
+                                    return readPageV1(dataPageV1);
+                                } catch (IOException e) {
+                                    throw new RuntimeException(e);
+                                }
+                            }
+
+                            @Override
+                            public Integer visit(DataPageV2 dataPageV2) {
+                                try {
+                                    return readPageV2(dataPageV2);
+                                } catch (IOException e) {
+                                    throw new RuntimeException(e);
+                                }
+                            }
+                        });
+        readState.resetForNewPage(pageValueCount, pageFirstRowIndex);
+        return pageValueCount;
     }
 
-    private void readPageV1(DataPageV1 page) throws IOException {
-        this.pageValueCount = page.getValueCount();
+    private int readPageV1(DataPageV1 page) throws IOException {
+        int pageValueCount = page.getValueCount();
         ValuesReader rlReader = 
page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
 
         // Initialize the decoders.
@@ -211,30 +290,31 @@ public abstract class AbstractColumnReader<VECTOR extends 
WritableColumnVector>
             ByteBufferInputStream in = bytes.toInputStream();
             rlReader.initFromPage(pageValueCount, in);
             this.runLenDecoder.initFromStream(pageValueCount, in);
-            prepareNewPage(page.getValueEncoding(), in);
+            prepareNewPage(page.getValueEncoding(), in, pageValueCount);
+            return pageValueCount;
         } catch (IOException e) {
             throw new IOException("could not read page " + page + " in col " + 
descriptor, e);
         }
     }
 
-    private void readPageV2(DataPageV2 page) throws IOException {
-        this.pageValueCount = page.getValueCount();
+    private int readPageV2(DataPageV2 page) throws IOException {
+        int pageValueCount = page.getValueCount();
 
         int bitWidth = 
BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
         // do not read the length from the stream. v2 pages handle dividing 
the page bytes.
         this.runLenDecoder = new RunLengthDecoder(bitWidth, false);
         this.runLenDecoder.initFromStream(
-                this.pageValueCount, 
page.getDefinitionLevels().toInputStream());
+                pageValueCount, page.getDefinitionLevels().toInputStream());
         try {
-            prepareNewPage(page.getDataEncoding(), 
page.getData().toInputStream());
+            prepareNewPage(page.getDataEncoding(), 
page.getData().toInputStream(), pageValueCount);
+            return pageValueCount;
         } catch (IOException e) {
             throw new IOException("could not read page " + page + " in col " + 
descriptor, e);
         }
     }
 
-    private void prepareNewPage(Encoding dataEncoding, ByteBufferInputStream 
in)
+    private void prepareNewPage(Encoding dataEncoding, ByteBufferInputStream 
in, int pageValueCount)
             throws IOException {
-        this.endOfPageValueCount = valuesRead + pageValueCount;
         if (dataEncoding.usesDictionary()) {
             if (dictionary == null) {
                 throw new IOException(
@@ -269,6 +349,14 @@ public abstract class AbstractColumnReader<VECTOR extends 
WritableColumnVector>
         afterReadPage();
     }
 
+    final void skipDataBuffer(int length) {
+        try {
+            dataInputStream.skipFully(length);
+        } catch (IOException e) {
+            throw new ParquetDecodingException("Failed to skip " + length + " 
bytes", e);
+        }
+    }
+
     final ByteBuffer readDataBuffer(int length) {
         try {
             return 
dataInputStream.slice(length).order(ByteOrder.LITTLE_ENDIAN);
@@ -291,6 +379,8 @@ public abstract class AbstractColumnReader<VECTOR extends 
WritableColumnVector>
     /** Read batch from {@link #runLenDecoder} and {@link #dataInputStream}. */
     protected abstract void readBatch(int rowId, int num, VECTOR column);
 
+    protected abstract void skipBatch(int num);
+
     /**
      * Decode dictionary ids to data. From {@link #runLenDecoder} and {@link 
#dictionaryIdsDecoder}.
      */
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java
index d5dc231d84..83d3c5a07d 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java
@@ -22,7 +22,7 @@ import 
org.apache.paimon.data.columnar.writable.WritableBooleanVector;
 import org.apache.paimon.data.columnar.writable.WritableIntVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.schema.PrimitiveType;
 
@@ -36,9 +36,9 @@ public class BooleanColumnReader extends 
AbstractColumnReader<WritableBooleanVec
 
     private byte currentByte = 0;
 
-    public BooleanColumnReader(ColumnDescriptor descriptor, PageReader 
pageReader)
+    public BooleanColumnReader(ColumnDescriptor descriptor, PageReadStore 
pageReadStore)
             throws IOException {
-        super(descriptor, pageReader);
+        super(descriptor, pageReadStore);
         checkTypeName(PrimitiveType.PrimitiveTypeName.BOOLEAN);
     }
 
@@ -94,6 +94,36 @@ public class BooleanColumnReader extends 
AbstractColumnReader<WritableBooleanVec
         }
     }
 
+    @Override
+    protected void skipBatch(int num) {
+        int left = num;
+        while (left > 0) {
+            if (runLenDecoder.currentCount == 0) {
+                runLenDecoder.readNextGroup();
+            }
+            int n = Math.min(left, runLenDecoder.currentCount);
+            switch (runLenDecoder.mode) {
+                case RLE:
+                    if (runLenDecoder.currentValue == maxDefLevel) {
+                        for (int i = 0; i < n; i++) {
+                            readBoolean();
+                        }
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; ++i) {
+                        if 
(runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
+                                == maxDefLevel) {
+                            readBoolean();
+                        }
+                    }
+                    break;
+            }
+            left -= n;
+            runLenDecoder.currentCount -= n;
+        }
+    }
+
     private boolean readBoolean() {
         if (bitOffset == 0) {
             try {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ByteColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ByteColumnReader.java
index bed9923d9b..804b8bc027 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ByteColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ByteColumnReader.java
@@ -22,7 +22,7 @@ import 
org.apache.paimon.data.columnar.writable.WritableByteVector;
 import org.apache.paimon.data.columnar.writable.WritableIntVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.schema.PrimitiveType;
 
 import java.io.IOException;
@@ -31,8 +31,9 @@ import java.nio.ByteBuffer;
 /** Byte {@link ColumnReader}. Using INT32 to store byte, so just cast int to 
byte. */
 public class ByteColumnReader extends AbstractColumnReader<WritableByteVector> 
{
 
-    public ByteColumnReader(ColumnDescriptor descriptor, PageReader 
pageReader) throws IOException {
-        super(descriptor, pageReader);
+    public ByteColumnReader(ColumnDescriptor descriptor, PageReadStore 
pageReadStore)
+            throws IOException {
+        super(descriptor, pageReadStore);
         checkTypeName(PrimitiveType.PrimitiveTypeName.INT32);
     }
 
@@ -69,6 +70,38 @@ public class ByteColumnReader extends 
AbstractColumnReader<WritableByteVector> {
         }
     }
 
+    @Override
+    protected void skipBatch(int num) {
+        int left = num;
+        while (left > 0) {
+            if (runLenDecoder.currentCount == 0) {
+                runLenDecoder.readNextGroup();
+            }
+            int n = Math.min(left, runLenDecoder.currentCount);
+            switch (runLenDecoder.mode) {
+                case RLE:
+                    if (runLenDecoder.currentValue == maxDefLevel) {
+                        skipByte(n);
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; ++i) {
+                        if 
(runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
+                                == maxDefLevel) {
+                            skipByte(1);
+                        }
+                    }
+                    break;
+            }
+            left -= n;
+            runLenDecoder.currentCount -= n;
+        }
+    }
+
+    private void skipByte(int num) {
+        skipDataBuffer(4 * num);
+    }
+
     @Override
     protected void readBatchFromDictionaryIds(
             int rowId, int num, WritableByteVector column, WritableIntVector 
dictionaryIds) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BytesColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BytesColumnReader.java
index e83115c8a6..6ee395e585 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BytesColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BytesColumnReader.java
@@ -22,7 +22,7 @@ import 
org.apache.paimon.data.columnar.writable.WritableBytesVector;
 import org.apache.paimon.data.columnar.writable.WritableIntVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.schema.PrimitiveType;
 
 import java.io.IOException;
@@ -31,9 +31,9 @@ import java.nio.ByteBuffer;
 /** Bytes {@link ColumnReader}. A int length and bytes data. */
 public class BytesColumnReader extends 
AbstractColumnReader<WritableBytesVector> {
 
-    public BytesColumnReader(ColumnDescriptor descriptor, PageReader 
pageReader)
+    public BytesColumnReader(ColumnDescriptor descriptor, PageReadStore 
pageReadStore)
             throws IOException {
-        super(descriptor, pageReader);
+        super(descriptor, pageReadStore);
         checkTypeName(PrimitiveType.PrimitiveTypeName.BINARY);
     }
 
@@ -70,6 +70,41 @@ public class BytesColumnReader extends 
AbstractColumnReader<WritableBytesVector>
         }
     }
 
+    @Override
+    protected void skipBatch(int num) {
+        int left = num;
+        while (left > 0) {
+            if (runLenDecoder.currentCount == 0) {
+                runLenDecoder.readNextGroup();
+            }
+            int n = Math.min(left, runLenDecoder.currentCount);
+            switch (runLenDecoder.mode) {
+                case RLE:
+                    if (runLenDecoder.currentValue == maxDefLevel) {
+                        skipBinary(n);
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; ++i) {
+                        if 
(runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
+                                == maxDefLevel) {
+                            skipBinary(1);
+                        }
+                    }
+                    break;
+            }
+            left -= n;
+            runLenDecoder.currentCount -= n;
+        }
+    }
+
+    private void skipBinary(int num) {
+        for (int i = 0; i < num; i++) {
+            int len = readDataBuffer(4).getInt();
+            skipDataBuffer(len);
+        }
+    }
+
     @Override
     protected void readBatchFromDictionaryIds(
             int rowId, int num, WritableBytesVector column, WritableIntVector 
dictionaryIds) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/DoubleColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/DoubleColumnReader.java
index d6d8aa2bbb..2cffd40624 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/DoubleColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/DoubleColumnReader.java
@@ -22,7 +22,7 @@ import 
org.apache.paimon.data.columnar.writable.WritableDoubleVector;
 import org.apache.paimon.data.columnar.writable.WritableIntVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.schema.PrimitiveType;
 
 import java.io.IOException;
@@ -31,9 +31,9 @@ import java.nio.ByteBuffer;
 /** Double {@link ColumnReader}. */
 public class DoubleColumnReader extends 
AbstractColumnReader<WritableDoubleVector> {
 
-    public DoubleColumnReader(ColumnDescriptor descriptor, PageReader 
pageReader)
+    public DoubleColumnReader(ColumnDescriptor descriptor, PageReadStore 
pageReadStore)
             throws IOException {
-        super(descriptor, pageReader);
+        super(descriptor, pageReadStore);
         checkTypeName(PrimitiveType.PrimitiveTypeName.DOUBLE);
     }
 
@@ -70,6 +70,38 @@ public class DoubleColumnReader extends 
AbstractColumnReader<WritableDoubleVecto
         }
     }
 
+    @Override
+    protected void skipBatch(int num) {
+        int left = num;
+        while (left > 0) {
+            if (runLenDecoder.currentCount == 0) {
+                runLenDecoder.readNextGroup();
+            }
+            int n = Math.min(left, runLenDecoder.currentCount);
+            switch (runLenDecoder.mode) {
+                case RLE:
+                    if (runLenDecoder.currentValue == maxDefLevel) {
+                        skipDouble(n);
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; ++i) {
+                        if 
(runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
+                                == maxDefLevel) {
+                            skipDouble(1);
+                        }
+                    }
+                    break;
+            }
+            left -= n;
+            runLenDecoder.currentCount -= n;
+        }
+    }
+
+    private void skipDouble(int num) {
+        skipDataBuffer(8 * num);
+    }
+
     @Override
     protected void readBatchFromDictionaryIds(
             int rowId, int num, WritableDoubleVector column, WritableIntVector 
dictionaryIds) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java
index afce717a67..25e1b466e4 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java
@@ -25,7 +25,7 @@ import 
org.apache.paimon.data.columnar.writable.WritableLongVector;
 import org.apache.paimon.format.parquet.ParquetSchemaConverter;
 
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveType;
 
@@ -39,8 +39,9 @@ public class FixedLenBytesColumnReader<VECTOR extends 
WritableColumnVector>
     private final int precision;
 
     public FixedLenBytesColumnReader(
-            ColumnDescriptor descriptor, PageReader pageReader, int precision) 
throws IOException {
-        super(descriptor, pageReader);
+            ColumnDescriptor descriptor, PageReadStore pageReadStore, int 
precision)
+            throws IOException {
+        super(descriptor, pageReadStore);
         checkTypeName(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY);
         this.precision = precision;
     }
@@ -79,6 +80,35 @@ public class FixedLenBytesColumnReader<VECTOR extends 
WritableColumnVector>
         }
     }
 
+    @Override
+    protected void skipBatch(int num) {
+        int bytesLen = descriptor.getPrimitiveType().getTypeLength();
+        if (ParquetSchemaConverter.is32BitDecimal(precision)) {
+            for (int i = 0; i < num; i++) {
+                if (runLenDecoder.readInteger() == maxDefLevel) {
+                    skipDataBinary(bytesLen);
+                }
+            }
+        } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
+
+            for (int i = 0; i < num; i++) {
+                if (runLenDecoder.readInteger() == maxDefLevel) {
+                    skipDataBinary(bytesLen);
+                }
+            }
+        } else {
+            for (int i = 0; i < num; i++) {
+                if (runLenDecoder.readInteger() == maxDefLevel) {
+                    skipDataBinary(bytesLen);
+                }
+            }
+        }
+    }
+
+    private void skipDataBinary(int len) {
+        skipDataBuffer(len);
+    }
+
     @Override
     protected void readBatchFromDictionaryIds(
             int rowId, int num, VECTOR column, WritableIntVector 
dictionaryIds) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FloatColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FloatColumnReader.java
index 1f4adfa4b9..e9eec13df5 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FloatColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FloatColumnReader.java
@@ -22,7 +22,7 @@ import 
org.apache.paimon.data.columnar.writable.WritableFloatVector;
 import org.apache.paimon.data.columnar.writable.WritableIntVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.schema.PrimitiveType;
 
 import java.io.IOException;
@@ -31,9 +31,9 @@ import java.nio.ByteBuffer;
 /** Float {@link ColumnReader}. */
 public class FloatColumnReader extends 
AbstractColumnReader<WritableFloatVector> {
 
-    public FloatColumnReader(ColumnDescriptor descriptor, PageReader 
pageReader)
+    public FloatColumnReader(ColumnDescriptor descriptor, PageReadStore 
pageReadStore)
             throws IOException {
-        super(descriptor, pageReader);
+        super(descriptor, pageReadStore);
         checkTypeName(PrimitiveType.PrimitiveTypeName.FLOAT);
     }
 
@@ -70,6 +70,38 @@ public class FloatColumnReader extends 
AbstractColumnReader<WritableFloatVector>
         }
     }
 
+    @Override
+    protected void skipBatch(int num) {
+        int left = num;
+        while (left > 0) {
+            if (runLenDecoder.currentCount == 0) {
+                runLenDecoder.readNextGroup();
+            }
+            int n = Math.min(left, runLenDecoder.currentCount);
+            switch (runLenDecoder.mode) {
+                case RLE:
+                    if (runLenDecoder.currentValue == maxDefLevel) {
+                        skipFloat(n);
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; ++i) {
+                        if 
(runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
+                                == maxDefLevel) {
+                            skipFloat(1);
+                        }
+                    }
+                    break;
+            }
+            left -= n;
+            runLenDecoder.currentCount -= n;
+        }
+    }
+
+    private void skipFloat(int num) {
+        skipDataBuffer(4 * num);
+    }
+
     @Override
     protected void readBatchFromDictionaryIds(
             int rowId, int num, WritableFloatVector column, WritableIntVector 
dictionaryIds) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/IntColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/IntColumnReader.java
index e38e916d18..521ad998f6 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/IntColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/IntColumnReader.java
@@ -21,7 +21,7 @@ package org.apache.paimon.format.parquet.reader;
 import org.apache.paimon.data.columnar.writable.WritableIntVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.schema.PrimitiveType;
 
 import java.io.IOException;
@@ -30,8 +30,9 @@ import java.nio.ByteBuffer;
 /** Int {@link ColumnReader}. */
 public class IntColumnReader extends AbstractColumnReader<WritableIntVector> {
 
-    public IntColumnReader(ColumnDescriptor descriptor, PageReader pageReader) 
throws IOException {
-        super(descriptor, pageReader);
+    public IntColumnReader(ColumnDescriptor descriptor, PageReadStore 
pageReadStore)
+            throws IOException {
+        super(descriptor, pageReadStore);
         checkTypeName(PrimitiveType.PrimitiveTypeName.INT32);
     }
 
@@ -68,6 +69,38 @@ public class IntColumnReader extends 
AbstractColumnReader<WritableIntVector> {
         }
     }
 
+    @Override
+    protected void skipBatch(int num) {
+        int left = num;
+        while (left > 0) {
+            if (runLenDecoder.currentCount == 0) {
+                runLenDecoder.readNextGroup();
+            }
+            int n = Math.min(left, runLenDecoder.currentCount);
+            switch (runLenDecoder.mode) {
+                case RLE:
+                    if (runLenDecoder.currentValue == maxDefLevel) {
+                        skipInteger(n);
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; ++i) {
+                        if 
(runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
+                                == maxDefLevel) {
+                            skipInteger(1);
+                        }
+                    }
+                    break;
+            }
+            left -= n;
+            runLenDecoder.currentCount -= n;
+        }
+    }
+
+    private void skipInteger(int num) {
+        skipDataBuffer(4 * num);
+    }
+
     @Override
     protected void readBatchFromDictionaryIds(
             int rowId, int num, WritableIntVector column, WritableIntVector 
dictionaryIds) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/LongColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/LongColumnReader.java
index a8e04eae67..c4af086a70 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/LongColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/LongColumnReader.java
@@ -22,7 +22,7 @@ import 
org.apache.paimon.data.columnar.writable.WritableIntVector;
 import org.apache.paimon.data.columnar.writable.WritableLongVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.schema.PrimitiveType;
 
 import java.io.IOException;
@@ -31,8 +31,9 @@ import java.nio.ByteBuffer;
 /** Long {@link ColumnReader}. */
 public class LongColumnReader extends AbstractColumnReader<WritableLongVector> 
{
 
-    public LongColumnReader(ColumnDescriptor descriptor, PageReader 
pageReader) throws IOException {
-        super(descriptor, pageReader);
+    public LongColumnReader(ColumnDescriptor descriptor, PageReadStore 
pageReadStore)
+            throws IOException {
+        super(descriptor, pageReadStore);
         checkTypeName(PrimitiveType.PrimitiveTypeName.INT64);
     }
 
@@ -69,6 +70,38 @@ public class LongColumnReader extends 
AbstractColumnReader<WritableLongVector> {
         }
     }
 
+    @Override
+    protected void skipBatch(int num) {
+        int left = num;
+        while (left > 0) {
+            if (runLenDecoder.currentCount == 0) {
+                runLenDecoder.readNextGroup();
+            }
+            int n = Math.min(left, runLenDecoder.currentCount);
+            switch (runLenDecoder.mode) {
+                case RLE:
+                    if (runLenDecoder.currentValue == maxDefLevel) {
+                        skipValue(n);
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; ++i) {
+                        if 
(runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
+                                == maxDefLevel) {
+                            skipValue(1);
+                        }
+                    }
+                    break;
+            }
+            left -= n;
+            runLenDecoder.currentCount -= n;
+        }
+    }
+
+    private void skipValue(int num) {
+        skipDataBuffer(num * 8);
+    }
+
     @Override
     protected void readBatchFromDictionaryIds(
             int rowId, int num, WritableLongVector column, WritableIntVector 
dictionaryIds) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
index 68225fbd13..8f20be2754 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
@@ -279,7 +279,7 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
             reader =
                     new NestedPrimitiveColumnReader(
                             descriptor,
-                            pages.getPageReader(descriptor),
+                            pages,
                             isUtcTimestamp,
                             descriptor.getPrimitiveType(),
                             field.getType(),
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
index 7d00ff7923..7db7aedbf6 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
@@ -44,6 +44,7 @@ import org.apache.parquet.column.page.DataPage;
 import org.apache.parquet.column.page.DataPageV1;
 import org.apache.parquet.column.page.DataPageV2;
 import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
@@ -82,15 +83,6 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
 
     private final boolean isUtcTimestamp;
 
-    /** Total number of values read. */
-    private long valuesRead;
-
-    /**
-     * value that indicates the end of the current page. That is, if 
valuesRead ==
-     * endOfPageValueCount, we are at the end of the page.
-     */
-    private long endOfPageValueCount;
-
     /** If true, the current page is dictionary encoded. */
     private boolean isCurrentPageDictionaryEncoded;
 
@@ -104,7 +96,12 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
     private ParquetDataColumnReader dataColumn;
 
     /** Total values in the current page. */
-    private int pageValueCount;
+    //    private int pageValueCount;
+
+    /**
+     * Helper struct to track intermediate states while reading Parquet pages 
in the column chunk.
+     */
+    protected final ParquetReadState readState;
 
     // flag to indicate if there is no data in parquet data page
     private boolean eof = false;
@@ -115,7 +112,7 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
 
     public NestedPrimitiveColumnReader(
             ColumnDescriptor descriptor,
-            PageReader pageReader,
+            PageReadStore pageReadStore,
             boolean isUtcTimestamp,
             Type parquetType,
             DataType dataType,
@@ -124,7 +121,7 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
             throws IOException {
         this.descriptor = descriptor;
         this.type = parquetType;
-        this.pageReader = pageReader;
+        this.pageReader = pageReadStore.getPageReader(descriptor);
         this.maxDefLevel = descriptor.getMaxDefinitionLevel();
         this.isUtcTimestamp = isUtcTimestamp;
         this.dataType = dataType;
@@ -132,6 +129,9 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
         this.readMapKey = readMapKey;
 
         DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+
+        this.readState = new 
ParquetReadState(pageReadStore.getRowIndexes().orElse(null));
+
         if (dictionaryPage != null) {
             try {
                 this.dictionary =
@@ -166,23 +166,55 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
             isFirstRow = false;
         }
 
-        // index to set value.
-        int index = 0;
-        int valueIndex = 0;
         List<Object> valueList = new ArrayList<>();
 
+        int valueIndex = collectDataFromParquetPage(readNumber, valueList);
+
+        return fillColumnVector(valueIndex, valueList);
+    }
+
+    private int collectDataFromParquetPage(int total, List<Object> valueList) 
throws IOException {
+        int valueIndex = 0;
         // repeated type need two loops to read data.
-        while (!eof && index < readNumber) {
+
+        readState.resetForNewBatch(total);
+
+        while (!eof && readState.rowsToReadInBatch > 0) {
+
+            if (readState.isFinished()) { // finished to read
+                eof = true;
+                break;
+            }
+
+            long pageRowId = readState.rowId;
+            long rangeStart = readState.currentRangeStart();
+            long rangeEnd = readState.currentRangeEnd();
+
+            if (pageRowId > rangeEnd) {
+                readState.nextRange();
+                continue;
+            }
+
+            boolean needFilterSkip = pageRowId < rangeStart;
+
             do {
-                if (!lastValue.shouldSkip) {
+
+                if (!lastValue.shouldSkip && !needFilterSkip) {
                     valueList.add(lastValue.value);
                     valueIndex++;
                 }
             } while (readValue() && (repetitionLevel != 0));
-            index++;
+
+            if (pageRowId == readState.rowId) {
+                readState.rowId = readState.rowId + 1;
+            }
+
+            if (!needFilterSkip) {
+                readState.rowsToReadInBatch = readState.rowsToReadInBatch - 1;
+            }
         }
 
-        return fillColumnVector(valueIndex, valueList);
+        return valueIndex;
     }
 
     public LevelDelegation getLevelDelegation() {
@@ -255,20 +287,24 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
         // get the values of repetition and definitionLevel
         repetitionLevel = repetitionLevelColumn.nextInt();
         definitionLevel = definitionLevelColumn.nextInt();
-        valuesRead++;
+        readState.valuesToReadInPage = readState.valuesToReadInPage - 1;
         repetitionLevelList.add(repetitionLevel);
         definitionLevelList.add(definitionLevel);
     }
 
     private int readPageIfNeed() throws IOException {
         // Compute the number of values we want to read in this page.
-        int leftInPage = (int) (endOfPageValueCount - valuesRead);
-        if (leftInPage == 0) {
-            // no data left in current page, load data from new page
-            readPage();
-            leftInPage = (int) (endOfPageValueCount - valuesRead);
+        if (readState.valuesToReadInPage == 0) {
+            int pageValueCount = readPage();
+            // 返回当前 page 的数据量
+            if (pageValueCount < 0) {
+                // we've read all the pages; this could happen when we're 
reading a repeated list
+                // and we
+                // don't know where the list will end until we've seen all the 
pages.
+                return -1;
+            }
         }
-        return leftInPage;
+        return readState.valuesToReadInPage;
     }
 
     private Object readPrimitiveTypedRow(DataType category) {
@@ -528,33 +564,36 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
         return phbv;
     }
 
-    protected void readPage() {
+    protected int readPage() {
         DataPage page = pageReader.readPage();
 
         if (page == null) {
-            return;
+            return -1;
         }
 
-        page.accept(
-                new DataPage.Visitor<Void>() {
-                    @Override
-                    public Void visit(DataPageV1 dataPageV1) {
-                        readPageV1(dataPageV1);
-                        return null;
-                    }
+        long pageFirstRowIndex = page.getFirstRowIndex().orElse(0L);
 
-                    @Override
-                    public Void visit(DataPageV2 dataPageV2) {
-                        readPageV2(dataPageV2);
-                        return null;
-                    }
-                });
+        int pageValueCount =
+                page.accept(
+                        new DataPage.Visitor<Integer>() {
+                            @Override
+                            public Integer visit(DataPageV1 dataPageV1) {
+                                return readPageV1(dataPageV1);
+                            }
+
+                            @Override
+                            public Integer visit(DataPageV2 dataPageV2) {
+                                return readPageV2(dataPageV2);
+                            }
+                        });
+        readState.resetForNewPage(pageValueCount, pageFirstRowIndex);
+        return pageValueCount;
     }
 
     private void initDataReader(Encoding dataEncoding, ByteBufferInputStream 
in, int valueCount)
             throws IOException {
-        this.pageValueCount = valueCount;
-        this.endOfPageValueCount = valuesRead + pageValueCount;
+        //        this.pageValueCount = valueCount;
+        //        this.endOfPageValueCount = valuesRead + pageValueCount;
         if (dataEncoding.usesDictionary()) {
             this.dataColumn = null;
             if (dictionary == null) {
@@ -577,13 +616,14 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
         }
 
         try {
-            dataColumn.initFromPage(pageValueCount, in);
+            dataColumn.initFromPage(valueCount, in);
         } catch (IOException e) {
             throw new IOException(String.format("Could not read page in col 
%s.", descriptor), e);
         }
     }
 
-    private void readPageV1(DataPageV1 page) {
+    private int readPageV1(DataPageV1 page) {
+        int pageValueCount = page.getValueCount();
         ValuesReader rlReader = 
page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
         ValuesReader dlReader = 
page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
         this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
@@ -597,15 +637,16 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
             LOG.debug("Reading definition levels at {}.", in.position());
             dlReader.initFromPage(pageValueCount, in);
             LOG.debug("Reading data at {}.", in.position());
-            initDataReader(page.getValueEncoding(), in, page.getValueCount());
+            initDataReader(page.getValueEncoding(), in, pageValueCount);
+            return pageValueCount;
         } catch (IOException e) {
             throw new ParquetDecodingException(
                     String.format("Could not read page %s in col %s.", page, 
descriptor), e);
         }
     }
 
-    private void readPageV2(DataPageV2 page) {
-        this.pageValueCount = page.getValueCount();
+    private int readPageV2(DataPageV2 page) {
+        int pageValueCount = page.getValueCount();
         this.repetitionLevelColumn =
                 newRLEIterator(descriptor.getMaxRepetitionLevel(), 
page.getRepetitionLevels());
         this.definitionLevelColumn =
@@ -615,8 +656,8 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
                     "Page data size {} bytes and {} records.",
                     page.getData().size(),
                     pageValueCount);
-            initDataReader(
-                    page.getDataEncoding(), page.getData().toInputStream(), 
page.getValueCount());
+            initDataReader(page.getDataEncoding(), 
page.getData().toInputStream(), pageValueCount);
+            return pageValueCount;
         } catch (IOException e) {
             throw new ParquetDecodingException(
                     String.format("Could not read page %s in col %s.", page, 
descriptor), e);
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReadState.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReadState.java
new file mode 100644
index 0000000000..a600367682
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReadState.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.reader;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PrimitiveIterator;
+
+/** Parquet reader state for column index. */
+public class ParquetReadState {
+    /** A special row range used when there is no row indexes (hence all rows 
must be included). */
+    private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, 
Long.MAX_VALUE);
+
+    /**
+     * A special row range used when the row indexes are present AND all the 
row ranges have been
+     * processed. This serves as a sentinel at the end indicating that all 
rows come after the last
+     * row range should be skipped.
+     */
+    private static final RowRange END_ROW_RANGE = new RowRange(Long.MAX_VALUE, 
Long.MIN_VALUE);
+
+    private final Iterator<RowRange> rowRanges;
+
+    private RowRange currentRange;
+
+    /** row index for the next read. */
+    long rowId;
+
+    int valuesToReadInPage;
+    int rowsToReadInBatch;
+
+    public ParquetReadState(PrimitiveIterator.OfLong rowIndexes) {
+        this.rowRanges = constructRanges(rowIndexes);
+        nextRange();
+    }
+
+    /**
+     * Construct a list of row ranges from the given `rowIndexes`. For 
example, suppose the
+     * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 
3 row ranges: `[0-2],
+     * [4-5], [7-9]`.
+     */
+    private Iterator<RowRange> constructRanges(PrimitiveIterator.OfLong 
rowIndexes) {
+        if (rowIndexes == null) {
+            return null;
+        }
+
+        List<RowRange> rowRanges = new ArrayList<>();
+        long currentStart = Long.MIN_VALUE;
+        long previous = Long.MIN_VALUE;
+
+        while (rowIndexes.hasNext()) {
+            long idx = rowIndexes.nextLong();
+            if (currentStart == Long.MIN_VALUE) {
+                currentStart = idx;
+            } else if (previous + 1 != idx) {
+                RowRange range = new RowRange(currentStart, previous);
+                rowRanges.add(range);
+                currentStart = idx;
+            }
+            previous = idx;
+        }
+
+        if (previous != Long.MIN_VALUE) {
+            rowRanges.add(new RowRange(currentStart, previous));
+        }
+
+        return rowRanges.iterator();
+    }
+
+    /** Must be called at the beginning of reading a new batch. */
+    void resetForNewBatch(int batchSize) {
+        this.rowsToReadInBatch = batchSize;
+    }
+
+    /** Must be called at the beginning of reading a new page. */
+    void resetForNewPage(int totalValuesInPage, long pageFirstRowIndex) {
+        this.valuesToReadInPage = totalValuesInPage;
+        this.rowId = pageFirstRowIndex;
+    }
+
+    /** Returns the start index of the current row range. */
+    public long currentRangeStart() {
+        return currentRange.start;
+    }
+
+    /** Returns the end index of the current row range. */
+    public long currentRangeEnd() {
+        return currentRange.end;
+    }
+
+    public boolean isFinished() {
+        return this.currentRange.equals(this.END_ROW_RANGE);
+    }
+
+    public boolean isMaxRange() {
+        return this.currentRange.equals(this.MAX_ROW_RANGE);
+    }
+
+    public RowRange getCurrentRange() {
+        return currentRange;
+    }
+
+    /** Advance to the next range. */
+    public void nextRange() {
+        if (rowRanges == null) {
+            currentRange = MAX_ROW_RANGE;
+        } else if (!rowRanges.hasNext()) {
+            currentRange = END_ROW_RANGE;
+        } else {
+            currentRange = rowRanges.next();
+        }
+    }
+
+    /** Helper struct to represent a range of row indexes `[start, end]`. */
+    public static class RowRange {
+        final long start;
+        final long end;
+
+        RowRange(long start, long end) {
+            this.start = start;
+            this.end = end;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof RowRange)) {
+                return false;
+            }
+            return ((RowRange) obj).start == this.start && ((RowRange) 
obj).end == this.end;
+        }
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
index 860ec54fa8..a2be77414d 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
@@ -87,58 +87,45 @@ public class ParquetSplitReaderUtil {
                 getAllColumnDescriptorByType(depth, type, columnDescriptors);
         switch (fieldType.getTypeRoot()) {
             case BOOLEAN:
-                return new BooleanColumnReader(
-                        descriptors.get(0), 
pages.getPageReader(descriptors.get(0)));
+                return new BooleanColumnReader(descriptors.get(0), pages);
             case TINYINT:
-                return new ByteColumnReader(
-                        descriptors.get(0), 
pages.getPageReader(descriptors.get(0)));
+                return new ByteColumnReader(descriptors.get(0), pages);
             case DOUBLE:
-                return new DoubleColumnReader(
-                        descriptors.get(0), 
pages.getPageReader(descriptors.get(0)));
+                return new DoubleColumnReader(descriptors.get(0), pages);
             case FLOAT:
-                return new FloatColumnReader(
-                        descriptors.get(0), 
pages.getPageReader(descriptors.get(0)));
+                return new FloatColumnReader(descriptors.get(0), pages);
             case INTEGER:
             case DATE:
             case TIME_WITHOUT_TIME_ZONE:
-                return new IntColumnReader(
-                        descriptors.get(0), 
pages.getPageReader(descriptors.get(0)));
+                return new IntColumnReader(descriptors.get(0), pages);
             case BIGINT:
-                return new LongColumnReader(
-                        descriptors.get(0), 
pages.getPageReader(descriptors.get(0)));
+                return new LongColumnReader(descriptors.get(0), pages);
             case SMALLINT:
-                return new ShortColumnReader(
-                        descriptors.get(0), 
pages.getPageReader(descriptors.get(0)));
+                return new ShortColumnReader(descriptors.get(0), pages);
             case CHAR:
             case VARCHAR:
             case BINARY:
             case VARBINARY:
-                return new BytesColumnReader(
-                        descriptors.get(0), 
pages.getPageReader(descriptors.get(0)));
+                return new BytesColumnReader(descriptors.get(0), pages);
             case TIMESTAMP_WITHOUT_TIME_ZONE:
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                 if 
(descriptors.get(0).getPrimitiveType().getPrimitiveTypeName()
                         == PrimitiveType.PrimitiveTypeName.INT64) {
-                    return new LongColumnReader(
-                            descriptors.get(0), 
pages.getPageReader(descriptors.get(0)));
+                    return new LongColumnReader(descriptors.get(0), pages);
                 }
-                return new TimestampColumnReader(
-                        true, descriptors.get(0), 
pages.getPageReader(descriptors.get(0)));
+                return new TimestampColumnReader(true, descriptors.get(0), 
pages);
             case DECIMAL:
                 switch 
(descriptors.get(0).getPrimitiveType().getPrimitiveTypeName()) {
                     case INT32:
-                        return new IntColumnReader(
-                                descriptors.get(0), 
pages.getPageReader(descriptors.get(0)));
+                        return new IntColumnReader(descriptors.get(0), pages);
                     case INT64:
-                        return new LongColumnReader(
-                                descriptors.get(0), 
pages.getPageReader(descriptors.get(0)));
+                        return new LongColumnReader(descriptors.get(0), pages);
                     case BINARY:
-                        return new BytesColumnReader(
-                                descriptors.get(0), 
pages.getPageReader(descriptors.get(0)));
+                        return new BytesColumnReader(descriptors.get(0), 
pages);
                     case FIXED_LEN_BYTE_ARRAY:
                         return new FixedLenBytesColumnReader(
                                 descriptors.get(0),
-                                pages.getPageReader(descriptors.get(0)),
+                                pages,
                                 ((DecimalType) fieldType).getPrecision());
                 }
             case ARRAY:
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java
index 2dd1655d57..ebb8f28fa1 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java
@@ -194,6 +194,51 @@ final class RunLengthDecoder {
         }
     }
 
+    void skipDictionaryIds(int total, int level, RunLengthDecoder data) {
+        int left = total;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    if (currentValue == level) {
+                        data.skipDictionaryIdData(n);
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; ++i) {
+                        if (currentBuffer[currentBufferIdx++] == level) {
+                            data.readInteger();
+                        }
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    private void skipDictionaryIdData(int total) {
+        int left = total;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    break;
+                case PACKED:
+                    currentBufferIdx += n;
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
     /** Reads the next varint encoded int. */
     private int readUnsignedVarInt() throws IOException {
         int value = 0;
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ShortColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ShortColumnReader.java
index 7b32232261..bdb2f401fa 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ShortColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ShortColumnReader.java
@@ -22,7 +22,7 @@ import 
org.apache.paimon.data.columnar.writable.WritableIntVector;
 import org.apache.paimon.data.columnar.writable.WritableShortVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.schema.PrimitiveType;
 
 import java.io.IOException;
@@ -30,9 +30,9 @@ import java.io.IOException;
 /** Short {@link ColumnReader}. Using INT32 to store short, so just cast int 
to short. */
 public class ShortColumnReader extends 
AbstractColumnReader<WritableShortVector> {
 
-    public ShortColumnReader(ColumnDescriptor descriptor, PageReader 
pageReader)
+    public ShortColumnReader(ColumnDescriptor descriptor, PageReadStore 
pageReadStore)
             throws IOException {
-        super(descriptor, pageReader);
+        super(descriptor, pageReadStore);
         checkTypeName(PrimitiveType.PrimitiveTypeName.INT32);
     }
 
@@ -71,6 +71,38 @@ public class ShortColumnReader extends 
AbstractColumnReader<WritableShortVector>
         }
     }
 
+    @Override
+    protected void skipBatch(int num) {
+        int left = num;
+        while (left > 0) {
+            if (runLenDecoder.currentCount == 0) {
+                runLenDecoder.readNextGroup();
+            }
+            int n = Math.min(left, runLenDecoder.currentCount);
+            switch (runLenDecoder.mode) {
+                case RLE:
+                    if (runLenDecoder.currentValue == maxDefLevel) {
+                        skipShot(n);
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; ++i) {
+                        if 
(runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
+                                == maxDefLevel) {
+                            skipShot(1);
+                        }
+                    }
+                    break;
+            }
+            left -= n;
+            runLenDecoder.currentCount -= n;
+        }
+    }
+
+    private void skipShot(int num) {
+        skipDataBuffer(4 * num);
+    }
+
     @Override
     protected void readBatchFromDictionaryIds(
             int rowId, int num, WritableShortVector column, WritableIntVector 
dictionaryIds) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java
index 4a279ff90e..8767173315 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java
@@ -23,7 +23,7 @@ import 
org.apache.paimon.data.columnar.writable.WritableIntVector;
 import org.apache.paimon.data.columnar.writable.WritableTimestampVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveType;
 
@@ -49,9 +49,9 @@ public class TimestampColumnReader extends 
AbstractColumnReader<WritableTimestam
     private final boolean utcTimestamp;
 
     public TimestampColumnReader(
-            boolean utcTimestamp, ColumnDescriptor descriptor, PageReader 
pageReader)
+            boolean utcTimestamp, ColumnDescriptor descriptor, PageReadStore 
pageReadStore)
             throws IOException {
-        super(descriptor, pageReader);
+        super(descriptor, pageReadStore);
         this.utcTimestamp = utcTimestamp;
         checkTypeName(PrimitiveType.PrimitiveTypeName.INT96);
     }
@@ -75,6 +75,15 @@ public class TimestampColumnReader extends 
AbstractColumnReader<WritableTimestam
         }
     }
 
+    @Override
+    protected void skipBatch(int num) {
+        for (int i = 0; i < num; i++) {
+            if (runLenDecoder.readInteger() == maxDefLevel) {
+                skipDataBuffer(12);
+            }
+        }
+    }
+
     @Override
     protected void readBatchFromDictionaryIds(
             int rowId, int num, WritableTimestampVector column, 
WritableIntVector dictionaryIds) {

Reply via email to