This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8127e1be22 [parquet] Fix parquet Vectored read in ParquetFileReader
(#5792)
8127e1be22 is described below
commit 8127e1be221209ccd625293f38191ec8eaff0501
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 25 15:00:18 2025 +0800
[parquet] Fix parquet Vectored read in ParquetFileReader (#5792)
---
.../apache/parquet/hadoop/ParquetFileReader.java | 29 ++++++++--------------
1 file changed, 11 insertions(+), 18 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 256eec1a1c..05e9d12033 100644
---
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -20,6 +20,7 @@ package org.apache.parquet.hadoop;
import org.apache.paimon.format.parquet.ParquetInputFile;
import org.apache.paimon.format.parquet.ParquetInputStream;
+import org.apache.paimon.fs.FileRange;
import org.apache.paimon.fs.VectoredReadable;
import org.apache.paimon.utils.RoaringBitmap32;
@@ -74,7 +75,6 @@ import
org.apache.parquet.internal.filter2.columnindex.RowRanges;
import org.apache.parquet.internal.hadoop.metadata.IndexReference;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.io.ParquetFileRange;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
@@ -585,14 +585,8 @@ public class ParquetFileReader implements Closeable {
List<ConsecutivePartList> allParts, ChunkListBuilder builder)
throws IOException {
if (shouldUseVectoredIo(allParts)) {
- try {
- readVectored(allParts, builder);
- return;
- } catch (IllegalArgumentException | UnsupportedOperationException
e) {
- // Either the arguments are wrong or somehow this is being
invoked against
- // a hadoop release which doesn't have the API and yet somehow
it got here.
- LOG.warn("readVectored() failed; falling back to normal IO
against {}", f, e);
- }
+ readVectored(allParts, builder);
+ return;
}
for (ConsecutivePartList consecutiveChunks : allParts) {
consecutiveChunks.readAll(f, builder);
@@ -650,7 +644,7 @@ public class ParquetFileReader implements Closeable {
private void readVectored(List<ConsecutivePartList> allParts,
ChunkListBuilder builder)
throws IOException {
- List<ParquetFileRange> ranges = new ArrayList<>(allParts.size());
+ List<FileRange> ranges = new ArrayList<>(allParts.size());
long totalSize = 0;
for (ConsecutivePartList consecutiveChunks : allParts) {
final long len = consecutiveChunks.length;
@@ -658,16 +652,16 @@ public class ParquetFileReader implements Closeable {
len < Integer.MAX_VALUE,
"Invalid length %s for vectored read operation. It must be
less than max integer value.",
len);
- ranges.add(new ParquetFileRange(consecutiveChunks.offset, (int)
len));
+ ranges.add(FileRange.createFileRange(consecutiveChunks.offset,
(int) len));
totalSize += len;
}
LOG.debug(
"Reading {} bytes of data with vectored IO in {} ranges",
totalSize, ranges.size());
// Request a vectored read;
- f.readVectored(ranges, options.getAllocator());
+ ((VectoredReadable) f.in()).readVectored(ranges);
int k = 0;
for (ConsecutivePartList consecutivePart : allParts) {
- ParquetFileRange currRange = ranges.get(k++);
+ FileRange currRange = ranges.get(k++);
consecutivePart.readFromVectoredRange(currRange, builder);
}
}
@@ -1667,9 +1661,9 @@ public class ParquetFileReader implements Closeable {
* @throws IOException if there is an error while reading from the
stream, including a
* timeout.
*/
- public void readFromVectoredRange(ParquetFileRange currRange,
ChunkListBuilder builder)
+ public void readFromVectoredRange(FileRange currRange,
ChunkListBuilder builder)
throws IOException {
- ByteBuffer buffer;
+ byte[] buffer;
final long timeoutSeconds = HADOOP_VECTORED_READ_TIMEOUT_SECONDS;
long readStart = System.nanoTime();
try {
@@ -1678,8 +1672,7 @@ public class ParquetFileReader implements Closeable {
currRange,
timeoutSeconds);
buffer =
- FutureIO.awaitFuture(
- currRange.getDataReadFuture(), timeoutSeconds,
TimeUnit.SECONDS);
+ FutureIO.awaitFuture(currRange.getData(),
timeoutSeconds, TimeUnit.SECONDS);
setReadMetrics(readStart, currRange.getLength());
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(currRange.getLength());
@@ -1691,7 +1684,7 @@ public class ParquetFileReader implements Closeable {
LOG.error(error, e);
throw new IOException(error, e);
}
- ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer);
+ ByteBufferInputStream stream =
ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer));
for (ChunkDescriptor descriptor : chunks) {
builder.add(descriptor, stream.sliceBuffers(descriptor.size),
f);
}