[
https://issues.apache.org/jira/browse/PARQUET-2171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17788999#comment-17788999
]
ASF GitHub Bot commented on PARQUET-2171:
-----------------------------------------
wgtmac commented on code in PR #1139:
URL: https://github.com/apache/parquet-mr/pull/1139#discussion_r1402925218
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -125,6 +130,8 @@ public class ParquetFileReader implements Closeable {
public static String PARQUET_READ_PARALLELISM =
"parquet.metadata.read.parallelism";
+ public static final long HADOOP_VECTORED_READ_TIMEOUT_SECONDS = 300;
Review Comment:
Any explanation for the magic number 300 here? Should it be configurable?
##########
parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java:
##########
@@ -327,6 +336,9 @@ public Builder ctorImpl(String className, Class<?>...
argClasses) {
.buildChecked();
} catch (NoSuchMethodException e) {
// not the right implementation
+ LOG.debug("failed to load constructor arity {} from class {}",
+ argClasses.length, className, e);
+
Review Comment:
Please remove the blank line.
##########
parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java:
##########
@@ -91,6 +92,7 @@ public Builder(Configuration conf, Path filePath) {
super(new HadoopParquetConfiguration(conf));
this.conf = conf;
this.filePath = filePath;
+
Review Comment:
ditto
##########
parquet-hadoop/README.md:
##########
@@ -501,3 +501,11 @@ If `false`, key material is stored in separate new files,
created in the same fo
**Description:** Length of key encryption keys (KEKs), randomly generated by
parquet key management tools. Can be 128, 192 or 256 bits.
**Default value:** `128`
+---
+
+**Property:** `parquet.hadoop.vectored.io.enabled`
Review Comment:
Thanks for adding the doc!
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.hadoop.util.vectorio;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UncheckedIOException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.util.DynMethods;
+
+/**
+ * Binding utils.
+ */
+public final class BindingUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(BindingUtils.class);
+
Review Comment:
```suggestion
```
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1063,6 +1068,69 @@ public ColumnChunkPageReadStore readFilteredRowGroup(int
blockIndex, RowRanges r
return internalReadFilteredRowGroup(block, rowRanges,
getColumnIndexStore(blockIndex));
}
+ /**
+ * Read data in all parts via either vectored IO or serial IO.
+ * @param allParts all parts to be read.
+ * @param builder used to build chunk list to read the pages for the
different columns.
+ * @throws IOException any IOE.
+ */
+ private void readAllPartsVectoredOrNormal(List<ConsecutivePartList>
allParts, ChunkListBuilder builder)
+ throws IOException {
+ boolean isVectoredIO = options.useHadoopVectoredIO()
+ && f.readVectoredAvailable()
+ && partsLengthValidForVectoredIO(allParts);
+ if (isVectoredIO) {
+ readVectored(allParts, builder);
+ } else {
+ for (ConsecutivePartList consecutiveChunks : allParts) {
+ consecutiveChunks.readAll(f, builder);
+ }
+ }
+ }
+
+ /**
+ * Vectored IO doesn't support reading ranges of size greater than
+ * Integer.MAX_VALUE.
+ * @param allParts all parts to read.
+ * @return true or false.
+ */
+ private boolean partsLengthValidForVectoredIO(List<ConsecutivePartList>
allParts) {
+ for (ConsecutivePartList consecutivePart : allParts) {
+ if (consecutivePart.length >= Integer.MAX_VALUE) {
+ LOG.debug("Part length {} greater than Integer.MAX_VALUE thus
disabling vectored IO", consecutivePart.length);
Review Comment:
LOG.warn ?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1063,6 +1068,69 @@ public ColumnChunkPageReadStore readFilteredRowGroup(int
blockIndex, RowRanges r
return internalReadFilteredRowGroup(block, rowRanges,
getColumnIndexStore(blockIndex));
}
+ /**
+ * Read data in all parts via either vectored IO or serial IO.
+ * @param allParts all parts to be read.
+ * @param builder used to build chunk list to read the pages for the
different columns.
+ * @throws IOException any IOE.
+ */
+ private void readAllPartsVectoredOrNormal(List<ConsecutivePartList>
allParts, ChunkListBuilder builder)
+ throws IOException {
+ boolean isVectoredIO = options.useHadoopVectoredIO()
+ && f.readVectoredAvailable()
+ && partsLengthValidForVectoredIO(allParts);
+ if (isVectoredIO) {
+ readVectored(allParts, builder);
+ } else {
+ for (ConsecutivePartList consecutiveChunks : allParts) {
+ consecutiveChunks.readAll(f, builder);
+ }
+ }
+ }
+
+ /**
+ * Vectored IO doesn't support reading ranges of size greater than
+ * Integer.MAX_VALUE.
+ * @param allParts all parts to read.
+ * @return true or false.
+ */
+ private boolean partsLengthValidForVectoredIO(List<ConsecutivePartList>
allParts) {
+ for (ConsecutivePartList consecutivePart : allParts) {
+ if (consecutivePart.length >= Integer.MAX_VALUE) {
+ LOG.debug("Part length {} greater than Integer.MAX_VALUE thus
disabling vectored IO", consecutivePart.length);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Read all parts through vectored IO.
+ * @param allParts all parts to be read.
+ * @param builder used to build chunk list to read the pages for the
different columns.
+ * @throws IOException any IOE.
+ */
+ private void readVectored(List<ConsecutivePartList> allParts,
+ ChunkListBuilder builder) throws IOException {
+
+ List<ParquetFileRange> ranges = new ArrayList<>(allParts.size());
+ for (ConsecutivePartList consecutiveChunks : allParts) {
+ Preconditions.checkArgument(consecutiveChunks.length < Integer.MAX_VALUE,
+ "Invalid length %s for vectored read operation. It must be less than
max integer value.",
+ consecutiveChunks.length);
+ ranges.add(new ParquetFileRange(consecutiveChunks.offset, (int)
consecutiveChunks.length));
+ }
+ LOG.debug("Doing vectored IO for ranges {}", ranges);
+ ByteBufferAllocator allocator = options.getAllocator();
+ //blocking or asynchronous vectored read.
+ f.readVectored(ranges, allocator::allocate);
+ int k = 0;
+ for (ConsecutivePartList consecutivePart : allParts) {
Review Comment:
```suggestion
for (ConsecutivePartList consecutivePart : allParts) {
```
##########
parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java:
##########
@@ -105,4 +107,21 @@ public abstract class SeekableInputStream extends
InputStream {
*/
public abstract void readFully(ByteBuffer buf) throws IOException;
+ /**
+ * Read a set of file ranges in a vectored manner.
Review Comment:
It would be good to say what to expect if we have overlapping ranges.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1063,6 +1068,69 @@ public ColumnChunkPageReadStore readFilteredRowGroup(int
blockIndex, RowRanges r
return internalReadFilteredRowGroup(block, rowRanges,
getColumnIndexStore(blockIndex));
}
+ /**
+ * Read data in all parts via either vectored IO or serial IO.
+ * @param allParts all parts to be read.
+ * @param builder used to build chunk list to read the pages for the
different columns.
+ * @throws IOException any IOE.
+ */
+ private void readAllPartsVectoredOrNormal(List<ConsecutivePartList>
allParts, ChunkListBuilder builder)
+ throws IOException {
+ boolean isVectoredIO = options.useHadoopVectoredIO()
+ && f.readVectoredAvailable()
+ && partsLengthValidForVectoredIO(allParts);
+ if (isVectoredIO) {
+ readVectored(allParts, builder);
+ } else {
+ for (ConsecutivePartList consecutiveChunks : allParts) {
+ consecutiveChunks.readAll(f, builder);
+ }
+ }
+ }
+
+ /**
+ * Vectored IO doesn't support reading ranges of size greater than
+ * Integer.MAX_VALUE.
+ * @param allParts all parts to read.
+ * @return true or false.
+ */
+ private boolean partsLengthValidForVectoredIO(List<ConsecutivePartList>
allParts) {
+ for (ConsecutivePartList consecutivePart : allParts) {
+ if (consecutivePart.length >= Integer.MAX_VALUE) {
+ LOG.debug("Part length {} greater than Integer.MAX_VALUE thus
disabling vectored IO", consecutivePart.length);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Read all parts through vectored IO.
+ * @param allParts all parts to be read.
+ * @param builder used to build chunk list to read the pages for the
different columns.
+ * @throws IOException any IOE.
+ */
+ private void readVectored(List<ConsecutivePartList> allParts,
+ ChunkListBuilder builder) throws IOException {
+
+ List<ParquetFileRange> ranges = new ArrayList<>(allParts.size());
+ for (ConsecutivePartList consecutiveChunks : allParts) {
+ Preconditions.checkArgument(consecutiveChunks.length < Integer.MAX_VALUE,
Review Comment:
You have checked this in partsLengthValidForVectoredIO, does it require to
check again here?
> Implement vectored IO in parquet file format
> --------------------------------------------
>
> Key: PARQUET-2171
> URL: https://issues.apache.org/jira/browse/PARQUET-2171
> Project: Parquet
> Issue Type: New Feature
> Components: parquet-mr
> Reporter: Mukund Thakur
> Priority: Major
>
> We recently added a new feature called vectored IO in Hadoop for improving
> read performance for seek heavy readers. Spark Jobs and others which uses
> parquet will greatly benefit from this api. Details can be found hereĀ
> [https://github.com/apache/hadoop/commit/e1842b2a749d79cbdc15c524515b9eda64c339d5]
> https://issues.apache.org/jira/browse/HADOOP-18103
> https://issues.apache.org/jira/browse/HADOOP-11867
--
This message was sent by Atlassian Jira
(v8.20.10#820010)