[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17637846#comment-17637846
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-----------------------------------------

wgtmac commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1030605835


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java:
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import 
org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.BlockCipher.Decryptor;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.hadoop.ParquetFileReader.Chunk;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the reading of a single page.
+ */
+public class FilePageReader implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FilePageReader.class);
+
+  private final ParquetFileReader parquetFileReader;
+  private final Chunk chunk;
+  private final int currentBlock;
+  private final BlockCipher.Decryptor headerBlockDecryptor;
+  private final BlockCipher.Decryptor pageBlockDecryptor;
+  private final byte[] aadPrefix;
+  private final int rowGroupOrdinal;
+  private final int columnOrdinal;
+
+  //state
+  private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new 
LinkedBlockingDeque<>();
+  private DictionaryPage dictionaryPage = null;
+  private int pageIndex = 0;
+  private long valuesCountReadSoFar = 0;
+  private int dataPageCountReadSoFar = 0;
+
+  // derived
+  private final PrimitiveType type;
+  private final byte[] dataPageAAD;
+  private byte[] dataPageHeaderAAD = null;
+
+  private final BytesInputDecompressor decompressor;
+
+  private final ConcurrentLinkedQueue<Future<Void>> readFutures = new 
ConcurrentLinkedQueue<>();
+
+  private final LongAdder totalTimeReadOnePage = new LongAdder();
+  private final LongAdder totalCountReadOnePage = new LongAdder();
+  private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+  private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+  private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+  private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+  public FilePageReader(ParquetFileReader parquetFileReader, Chunk chunk, int 
currentBlock,
+    Decryptor headerBlockDecryptor,
+    Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+    BytesInputDecompressor decompressor
+  ) {
+    this.parquetFileReader = parquetFileReader;
+    this.chunk = chunk;
+    this.currentBlock = currentBlock;
+    this.headerBlockDecryptor = headerBlockDecryptor;
+    this.pageBlockDecryptor = pageBlockDecryptor;
+    this.aadPrefix = aadPrefix;
+    this.rowGroupOrdinal = rowGroupOrdinal;
+    this.columnOrdinal = columnOrdinal;
+    this.decompressor = decompressor;
+
+    this.type = parquetFileReader.getFileMetaData().getSchema()
+      .getType(chunk.getDescriptor().getCol().getPath()).asPrimitiveType();
+
+    if (null != headerBlockDecryptor) {
+      dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+        rowGroupOrdinal,
+        columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+    }
+    if (null != pageBlockDecryptor) {
+      dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, 
rowGroupOrdinal,
+        columnOrdinal, 0);
+    } else {
+      dataPageAAD = null;
+    }
+  }
+
+  public DictionaryPage getDictionaryPage() {
+    return this.dictionaryPage;
+  }
+
+  public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk() {
+    return this.pagesInChunk;
+  }
+
+  void readAllRemainingPagesAsync() {
+    readFutures.offer(ParquetFileReader.processThreadPool.submit(new 
FilePageReaderTask(this)));
+  }
+
+  void readAllRemainingPages() throws IOException {
+    while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
+      readOnePage();
+    }
+    if (chunk.offsetIndex == null
+      && valuesCountReadSoFar != 
chunk.getDescriptor().getMetadata().getValueCount()) {
+      // Would be nice to have a CorruptParquetFileException or something as a 
subclass?
+      throw new IOException(
+        "Expected " + chunk.getDescriptor().getMetadata().getValueCount()
+          + " values in column chunk at " +
+          parquetFileReader.getPath() + " offset "
+          + chunk.descriptor.getMetadata().getFirstDataPageOffset() +
+          " but got " + valuesCountReadSoFar + " values instead over " + 
pagesInChunk.size()
+          + " pages ending at file offset " + 
(chunk.getDescriptor().getFileOffset()
+          + chunk.stream.position()));
+    }
+    try {
+      pagesInChunk.put(Optional.empty()); // add a marker for end of data
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted while reading page data", e);
+    }
+  }
+
+  void readOnePage() throws IOException {
+    long startRead = System.nanoTime();
+    try {
+      byte[] pageHeaderAAD = dataPageHeaderAAD;
+      if (null != headerBlockDecryptor) {
+        // Important: this verifies file integrity (makes sure dictionary page 
had not been removed)
+        if (null == dictionaryPage && 
chunk.getDescriptor().getMetadata().hasDictionaryPage()) {
+          pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPageHeader,
+            rowGroupOrdinal, columnOrdinal, -1);
+        } else {
+          int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar);
+          AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+        }
+      }
+      PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, 
pageHeaderAAD);
+      int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+      int compressedPageSize = pageHeader.getCompressed_page_size();
+      final BytesInput pageBytes;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          // there is only one dictionary page per column chunk
+          if (dictionaryPage != null) {
+            throw new ParquetDecodingException(
+              "more than one dictionary page in column " + 
chunk.getDescriptor().getCol());
+          }
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);
+          if (parquetFileReader.options.usePageChecksumVerification() && 
pageHeader.isSetCrc()) {
+            chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+              "could not verify dictionary page integrity, CRC checksum 
verification failed");
+          }
+          DictionaryPageHeader dicHeader = 
pageHeader.getDictionary_page_header();
+          DictionaryPage compressedDictionaryPage =
+            new DictionaryPage(
+              pageBytes,
+              uncompressedPageSize,
+              dicHeader.getNum_values(),
+              parquetFileReader.converter.getEncoding(dicHeader.getEncoding())
+            );
+          // Copy crc to new page, used for testing
+          if (pageHeader.isSetCrc()) {
+            compressedDictionaryPage.setCrc(pageHeader.getCrc());
+          }
+          dictionaryPage = compressedDictionaryPage;
+          break;
+        case DATA_PAGE:
+          DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);

Review Comment:
   `chunk.readAsBytesInput(compressedPageSize)` is running inside 
`processThreadPool ` and what it does is as below:
   ```
       public BytesInput readAsBytesInput(int size) throws IOException {
         if (LOG.isDebugEnabled()) {
           String mode = (isAsyncIOReaderEnabled()) ? "ASYNC" : "SYNC";
           LOG.debug("{} READ BYTES INPUT: stream {}", mode, stream);
         }
         return BytesInput.from(stream.sliceBuffers(size));
       }
   ```
   
   The `stream.sliceBuffers(size)` is a blocking call to 
`SequenceByteBufferInputStream.sliceBuffers()` which calls 
`AsyncMultiBufferInputStream.sliceBuffers()` internally and waits for return of 
`AsyncMultiBufferInputStream.nextBuffer()`. Please note that 
`AsyncMultiBufferInputStream.nextBuffer()` is running in the `ioThreadPool `.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java:
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import 
org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.BlockCipher.Decryptor;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.hadoop.ParquetFileReader.Chunk;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the reading of a single page.
+ */
+public class FilePageReader implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FilePageReader.class);
+
+  private final ParquetFileReader parquetFileReader;
+  private final Chunk chunk;
+  private final int currentBlock;
+  private final BlockCipher.Decryptor headerBlockDecryptor;
+  private final BlockCipher.Decryptor pageBlockDecryptor;
+  private final byte[] aadPrefix;
+  private final int rowGroupOrdinal;
+  private final int columnOrdinal;
+
+  //state
+  private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new 
LinkedBlockingDeque<>();
+  private DictionaryPage dictionaryPage = null;
+  private int pageIndex = 0;
+  private long valuesCountReadSoFar = 0;
+  private int dataPageCountReadSoFar = 0;
+
+  // derived
+  private final PrimitiveType type;
+  private final byte[] dataPageAAD;
+  private byte[] dataPageHeaderAAD = null;
+
+  private final BytesInputDecompressor decompressor;
+
+  private final ConcurrentLinkedQueue<Future<Void>> readFutures = new 
ConcurrentLinkedQueue<>();
+
+  private final LongAdder totalTimeReadOnePage = new LongAdder();
+  private final LongAdder totalCountReadOnePage = new LongAdder();
+  private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+  private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+  private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+  private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+  public FilePageReader(ParquetFileReader parquetFileReader, Chunk chunk, int 
currentBlock,
+    Decryptor headerBlockDecryptor,
+    Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+    BytesInputDecompressor decompressor
+  ) {
+    this.parquetFileReader = parquetFileReader;
+    this.chunk = chunk;
+    this.currentBlock = currentBlock;
+    this.headerBlockDecryptor = headerBlockDecryptor;
+    this.pageBlockDecryptor = pageBlockDecryptor;
+    this.aadPrefix = aadPrefix;
+    this.rowGroupOrdinal = rowGroupOrdinal;
+    this.columnOrdinal = columnOrdinal;
+    this.decompressor = decompressor;
+
+    this.type = parquetFileReader.getFileMetaData().getSchema()
+      .getType(chunk.getDescriptor().getCol().getPath()).asPrimitiveType();
+
+    if (null != headerBlockDecryptor) {
+      dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+        rowGroupOrdinal,
+        columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+    }
+    if (null != pageBlockDecryptor) {
+      dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, 
rowGroupOrdinal,
+        columnOrdinal, 0);
+    } else {
+      dataPageAAD = null;
+    }
+  }
+
+  public DictionaryPage getDictionaryPage() {
+    return this.dictionaryPage;
+  }
+
+  public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk() {
+    return this.pagesInChunk;
+  }
+
+  void readAllRemainingPagesAsync() {
+    readFutures.offer(ParquetFileReader.processThreadPool.submit(new 
FilePageReaderTask(this)));
+  }
+
+  void readAllRemainingPages() throws IOException {
+    while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
+      readOnePage();
+    }
+    if (chunk.offsetIndex == null
+      && valuesCountReadSoFar != 
chunk.getDescriptor().getMetadata().getValueCount()) {
+      // Would be nice to have a CorruptParquetFileException or something as a 
subclass?
+      throw new IOException(
+        "Expected " + chunk.getDescriptor().getMetadata().getValueCount()
+          + " values in column chunk at " +
+          parquetFileReader.getPath() + " offset "
+          + chunk.descriptor.getMetadata().getFirstDataPageOffset() +
+          " but got " + valuesCountReadSoFar + " values instead over " + 
pagesInChunk.size()
+          + " pages ending at file offset " + 
(chunk.getDescriptor().getFileOffset()
+          + chunk.stream.position()));
+    }
+    try {
+      pagesInChunk.put(Optional.empty()); // add a marker for end of data
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted while reading page data", e);
+    }
+  }
+
+  void readOnePage() throws IOException {
+    long startRead = System.nanoTime();
+    try {
+      byte[] pageHeaderAAD = dataPageHeaderAAD;
+      if (null != headerBlockDecryptor) {
+        // Important: this verifies file integrity (makes sure dictionary page 
had not been removed)
+        if (null == dictionaryPage && 
chunk.getDescriptor().getMetadata().hasDictionaryPage()) {
+          pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPageHeader,
+            rowGroupOrdinal, columnOrdinal, -1);
+        } else {
+          int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar);
+          AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+        }
+      }
+      PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, 
pageHeaderAAD);
+      int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+      int compressedPageSize = pageHeader.getCompressed_page_size();
+      final BytesInput pageBytes;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          // there is only one dictionary page per column chunk
+          if (dictionaryPage != null) {
+            throw new ParquetDecodingException(
+              "more than one dictionary page in column " + 
chunk.getDescriptor().getCol());
+          }
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);
+          if (parquetFileReader.options.usePageChecksumVerification() && 
pageHeader.isSetCrc()) {
+            chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+              "could not verify dictionary page integrity, CRC checksum 
verification failed");
+          }
+          DictionaryPageHeader dicHeader = 
pageHeader.getDictionary_page_header();
+          DictionaryPage compressedDictionaryPage =
+            new DictionaryPage(
+              pageBytes,
+              uncompressedPageSize,
+              dicHeader.getNum_values(),
+              parquetFileReader.converter.getEncoding(dicHeader.getEncoding())
+            );
+          // Copy crc to new page, used for testing
+          if (pageHeader.isSetCrc()) {
+            compressedDictionaryPage.setCrc(pageHeader.getCrc());
+          }
+          dictionaryPage = compressedDictionaryPage;
+          break;
+        case DATA_PAGE:
+          DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);

Review Comment:
   In the current implementation, tasks of `ioThreadPool` are submitted first 
and then tasks of `processThreadPool` get submitted. If we merge these two 
thread pools into a single one, problem will happen due to uncertainty of the 
execution order in the ExecutorService. When the tasks that are previously 
submitted to `processThreadPool` are running first and have exhausted all the 
threads in the pool, they will be unable to proceed because none of the I/O 
tasks manage to get an available thread. Please correct me if I am wrong. 
Thanks! @parthchandra 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java:
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import 
org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.BlockCipher.Decryptor;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.hadoop.ParquetFileReader.Chunk;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the reading of a single page.
+ */
+public class FilePageReader implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FilePageReader.class);
+
+  private final ParquetFileReader parquetFileReader;
+  private final Chunk chunk;
+  private final int currentBlock;
+  private final BlockCipher.Decryptor headerBlockDecryptor;
+  private final BlockCipher.Decryptor pageBlockDecryptor;
+  private final byte[] aadPrefix;
+  private final int rowGroupOrdinal;
+  private final int columnOrdinal;
+
+  //state
+  private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new 
LinkedBlockingDeque<>();
+  private DictionaryPage dictionaryPage = null;
+  private int pageIndex = 0;
+  private long valuesCountReadSoFar = 0;
+  private int dataPageCountReadSoFar = 0;
+
+  // derived
+  private final PrimitiveType type;
+  private final byte[] dataPageAAD;
+  private byte[] dataPageHeaderAAD = null;
+
+  private final BytesInputDecompressor decompressor;
+
+  private final ConcurrentLinkedQueue<Future<Void>> readFutures = new 
ConcurrentLinkedQueue<>();
+
+  private final LongAdder totalTimeReadOnePage = new LongAdder();
+  private final LongAdder totalCountReadOnePage = new LongAdder();
+  private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+  private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+  private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+  private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+  public FilePageReader(ParquetFileReader parquetFileReader, Chunk chunk, int 
currentBlock,
+    Decryptor headerBlockDecryptor,
+    Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+    BytesInputDecompressor decompressor
+  ) {
+    this.parquetFileReader = parquetFileReader;
+    this.chunk = chunk;
+    this.currentBlock = currentBlock;
+    this.headerBlockDecryptor = headerBlockDecryptor;
+    this.pageBlockDecryptor = pageBlockDecryptor;
+    this.aadPrefix = aadPrefix;
+    this.rowGroupOrdinal = rowGroupOrdinal;
+    this.columnOrdinal = columnOrdinal;
+    this.decompressor = decompressor;
+
+    this.type = parquetFileReader.getFileMetaData().getSchema()
+      .getType(chunk.getDescriptor().getCol().getPath()).asPrimitiveType();
+
+    if (null != headerBlockDecryptor) {
+      dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+        rowGroupOrdinal,
+        columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+    }
+    if (null != pageBlockDecryptor) {
+      dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, 
rowGroupOrdinal,
+        columnOrdinal, 0);
+    } else {
+      dataPageAAD = null;
+    }
+  }
+
+  public DictionaryPage getDictionaryPage() {
+    return this.dictionaryPage;
+  }
+
+  public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk() {
+    return this.pagesInChunk;
+  }
+
+  void readAllRemainingPagesAsync() {
+    readFutures.offer(ParquetFileReader.processThreadPool.submit(new 
FilePageReaderTask(this)));
+  }
+
+  void readAllRemainingPages() throws IOException {
+    while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
+      readOnePage();
+    }
+    if (chunk.offsetIndex == null
+      && valuesCountReadSoFar != 
chunk.getDescriptor().getMetadata().getValueCount()) {
+      // Would be nice to have a CorruptParquetFileException or something as a 
subclass?
+      throw new IOException(
+        "Expected " + chunk.getDescriptor().getMetadata().getValueCount()
+          + " values in column chunk at " +
+          parquetFileReader.getPath() + " offset "
+          + chunk.descriptor.getMetadata().getFirstDataPageOffset() +
+          " but got " + valuesCountReadSoFar + " values instead over " + 
pagesInChunk.size()
+          + " pages ending at file offset " + 
(chunk.getDescriptor().getFileOffset()
+          + chunk.stream.position()));
+    }
+    try {
+      pagesInChunk.put(Optional.empty()); // add a marker for end of data
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted while reading page data", e);
+    }
+  }
+
+  void readOnePage() throws IOException {
+    long startRead = System.nanoTime();
+    try {
+      byte[] pageHeaderAAD = dataPageHeaderAAD;
+      if (null != headerBlockDecryptor) {
+        // Important: this verifies file integrity (makes sure dictionary page 
had not been removed)
+        if (null == dictionaryPage && 
chunk.getDescriptor().getMetadata().hasDictionaryPage()) {
+          pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPageHeader,
+            rowGroupOrdinal, columnOrdinal, -1);
+        } else {
+          int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar);
+          AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+        }
+      }
+      PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, 
pageHeaderAAD);
+      int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+      int compressedPageSize = pageHeader.getCompressed_page_size();
+      final BytesInput pageBytes;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          // there is only one dictionary page per column chunk
+          if (dictionaryPage != null) {
+            throw new ParquetDecodingException(
+              "more than one dictionary page in column " + 
chunk.getDescriptor().getCol());
+          }
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);
+          if (parquetFileReader.options.usePageChecksumVerification() && 
pageHeader.isSetCrc()) {
+            chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+              "could not verify dictionary page integrity, CRC checksum 
verification failed");
+          }
+          DictionaryPageHeader dicHeader = 
pageHeader.getDictionary_page_header();
+          DictionaryPage compressedDictionaryPage =
+            new DictionaryPage(
+              pageBytes,
+              uncompressedPageSize,
+              dicHeader.getNum_values(),
+              parquetFileReader.converter.getEncoding(dicHeader.getEncoding())
+            );
+          // Copy crc to new page, used for testing
+          if (pageHeader.isSetCrc()) {
+            compressedDictionaryPage.setCrc(pageHeader.getCrc());
+          }
+          dictionaryPage = compressedDictionaryPage;
+          break;
+        case DATA_PAGE:
+          DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);

Review Comment:
   IIUC, the task in the `processThreadPool` is blocked here to wait for the 
completion of task in the `ioThreadPool`.





> Implement async IO for Parquet file reader
> ------------------------------------------
>
>                 Key: PARQUET-2149
>                 URL: https://issues.apache.org/jira/browse/PARQUET-2149
>             Project: Parquet
>          Issue Type: Improvement
>          Components: parquet-mr
>            Reporter: Parth Chandra
>            Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to