wgtmac commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1030605835


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java:
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import 
org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.BlockCipher.Decryptor;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.hadoop.ParquetFileReader.Chunk;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the reading of a single page.
+ */
+public class FilePageReader implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FilePageReader.class);
+
+  private final ParquetFileReader parquetFileReader;
+  private final Chunk chunk;
+  private final int currentBlock;
+  private final BlockCipher.Decryptor headerBlockDecryptor;
+  private final BlockCipher.Decryptor pageBlockDecryptor;
+  private final byte[] aadPrefix;
+  private final int rowGroupOrdinal;
+  private final int columnOrdinal;
+
+  //state
+  private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new 
LinkedBlockingDeque<>();
+  private DictionaryPage dictionaryPage = null;
+  private int pageIndex = 0;
+  private long valuesCountReadSoFar = 0;
+  private int dataPageCountReadSoFar = 0;
+
+  // derived
+  private final PrimitiveType type;
+  private final byte[] dataPageAAD;
+  private byte[] dataPageHeaderAAD = null;
+
+  private final BytesInputDecompressor decompressor;
+
+  private final ConcurrentLinkedQueue<Future<Void>> readFutures = new 
ConcurrentLinkedQueue<>();
+
+  private final LongAdder totalTimeReadOnePage = new LongAdder();
+  private final LongAdder totalCountReadOnePage = new LongAdder();
+  private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+  private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+  private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+  private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+  public FilePageReader(ParquetFileReader parquetFileReader, Chunk chunk, int 
currentBlock,
+    Decryptor headerBlockDecryptor,
+    Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+    BytesInputDecompressor decompressor
+  ) {
+    this.parquetFileReader = parquetFileReader;
+    this.chunk = chunk;
+    this.currentBlock = currentBlock;
+    this.headerBlockDecryptor = headerBlockDecryptor;
+    this.pageBlockDecryptor = pageBlockDecryptor;
+    this.aadPrefix = aadPrefix;
+    this.rowGroupOrdinal = rowGroupOrdinal;
+    this.columnOrdinal = columnOrdinal;
+    this.decompressor = decompressor;
+
+    this.type = parquetFileReader.getFileMetaData().getSchema()
+      .getType(chunk.getDescriptor().getCol().getPath()).asPrimitiveType();
+
+    if (null != headerBlockDecryptor) {
+      dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+        rowGroupOrdinal,
+        columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+    }
+    if (null != pageBlockDecryptor) {
+      dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, 
rowGroupOrdinal,
+        columnOrdinal, 0);
+    } else {
+      dataPageAAD = null;
+    }
+  }
+
+  public DictionaryPage getDictionaryPage() {
+    return this.dictionaryPage;
+  }
+
+  public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk() {
+    return this.pagesInChunk;
+  }
+
+  void readAllRemainingPagesAsync() {
+    readFutures.offer(ParquetFileReader.processThreadPool.submit(new 
FilePageReaderTask(this)));
+  }
+
+  void readAllRemainingPages() throws IOException {
+    while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
+      readOnePage();
+    }
+    if (chunk.offsetIndex == null
+      && valuesCountReadSoFar != 
chunk.getDescriptor().getMetadata().getValueCount()) {
+      // Would be nice to have a CorruptParquetFileException or something as a 
subclass?
+      throw new IOException(
+        "Expected " + chunk.getDescriptor().getMetadata().getValueCount()
+          + " values in column chunk at " +
+          parquetFileReader.getPath() + " offset "
+          + chunk.descriptor.getMetadata().getFirstDataPageOffset() +
+          " but got " + valuesCountReadSoFar + " values instead over " + 
pagesInChunk.size()
+          + " pages ending at file offset " + 
(chunk.getDescriptor().getFileOffset()
+          + chunk.stream.position()));
+    }
+    try {
+      pagesInChunk.put(Optional.empty()); // add a marker for end of data
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted while reading page data", e);
+    }
+  }
+
+  void readOnePage() throws IOException {
+    long startRead = System.nanoTime();
+    try {
+      byte[] pageHeaderAAD = dataPageHeaderAAD;
+      if (null != headerBlockDecryptor) {
+        // Important: this verifies file integrity (makes sure dictionary page 
had not been removed)
+        if (null == dictionaryPage && 
chunk.getDescriptor().getMetadata().hasDictionaryPage()) {
+          pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPageHeader,
+            rowGroupOrdinal, columnOrdinal, -1);
+        } else {
+          int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar);
+          AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+        }
+      }
+      PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, 
pageHeaderAAD);
+      int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+      int compressedPageSize = pageHeader.getCompressed_page_size();
+      final BytesInput pageBytes;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          // there is only one dictionary page per column chunk
+          if (dictionaryPage != null) {
+            throw new ParquetDecodingException(
+              "more than one dictionary page in column " + 
chunk.getDescriptor().getCol());
+          }
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);
+          if (parquetFileReader.options.usePageChecksumVerification() && 
pageHeader.isSetCrc()) {
+            chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+              "could not verify dictionary page integrity, CRC checksum 
verification failed");
+          }
+          DictionaryPageHeader dicHeader = 
pageHeader.getDictionary_page_header();
+          DictionaryPage compressedDictionaryPage =
+            new DictionaryPage(
+              pageBytes,
+              uncompressedPageSize,
+              dicHeader.getNum_values(),
+              parquetFileReader.converter.getEncoding(dicHeader.getEncoding())
+            );
+          // Copy crc to new page, used for testing
+          if (pageHeader.isSetCrc()) {
+            compressedDictionaryPage.setCrc(pageHeader.getCrc());
+          }
+          dictionaryPage = compressedDictionaryPage;
+          break;
+        case DATA_PAGE:
+          DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);

Review Comment:
   `chunk.readAsBytesInput(compressedPageSize)` is running inside 
`processThreadPool ` and what it does is as below:
   ```
       public BytesInput readAsBytesInput(int size) throws IOException {
         if (LOG.isDebugEnabled()) {
           String mode = (isAsyncIOReaderEnabled()) ? "ASYNC" : "SYNC";
           LOG.debug("{} READ BYTES INPUT: stream {}", mode, stream);
         }
         return BytesInput.from(stream.sliceBuffers(size));
       }
   ```
   
   The `stream.sliceBuffers(size)` is a blocking call to 
`SequenceByteBufferInputStream.sliceBuffers()` which calls 
`AsyncMultiBufferInputStream.sliceBuffers()` internally and waits for return of 
`AsyncMultiBufferInputStream.nextBuffer()`. Please note that 
`AsyncMultiBufferInputStream.nextBuffer()` is running in the `ioThreadPool `.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java:
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import 
org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.BlockCipher.Decryptor;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.hadoop.ParquetFileReader.Chunk;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the reading of a single page.
+ */
+public class FilePageReader implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FilePageReader.class);
+
+  private final ParquetFileReader parquetFileReader;
+  private final Chunk chunk;
+  private final int currentBlock;
+  private final BlockCipher.Decryptor headerBlockDecryptor;
+  private final BlockCipher.Decryptor pageBlockDecryptor;
+  private final byte[] aadPrefix;
+  private final int rowGroupOrdinal;
+  private final int columnOrdinal;
+
+  //state
+  private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new 
LinkedBlockingDeque<>();
+  private DictionaryPage dictionaryPage = null;
+  private int pageIndex = 0;
+  private long valuesCountReadSoFar = 0;
+  private int dataPageCountReadSoFar = 0;
+
+  // derived
+  private final PrimitiveType type;
+  private final byte[] dataPageAAD;
+  private byte[] dataPageHeaderAAD = null;
+
+  private final BytesInputDecompressor decompressor;
+
+  private final ConcurrentLinkedQueue<Future<Void>> readFutures = new 
ConcurrentLinkedQueue<>();
+
+  private final LongAdder totalTimeReadOnePage = new LongAdder();
+  private final LongAdder totalCountReadOnePage = new LongAdder();
+  private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+  private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+  private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+  private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+  public FilePageReader(ParquetFileReader parquetFileReader, Chunk chunk, int 
currentBlock,
+    Decryptor headerBlockDecryptor,
+    Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+    BytesInputDecompressor decompressor
+  ) {
+    this.parquetFileReader = parquetFileReader;
+    this.chunk = chunk;
+    this.currentBlock = currentBlock;
+    this.headerBlockDecryptor = headerBlockDecryptor;
+    this.pageBlockDecryptor = pageBlockDecryptor;
+    this.aadPrefix = aadPrefix;
+    this.rowGroupOrdinal = rowGroupOrdinal;
+    this.columnOrdinal = columnOrdinal;
+    this.decompressor = decompressor;
+
+    this.type = parquetFileReader.getFileMetaData().getSchema()
+      .getType(chunk.getDescriptor().getCol().getPath()).asPrimitiveType();
+
+    if (null != headerBlockDecryptor) {
+      dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+        rowGroupOrdinal,
+        columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+    }
+    if (null != pageBlockDecryptor) {
+      dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, 
rowGroupOrdinal,
+        columnOrdinal, 0);
+    } else {
+      dataPageAAD = null;
+    }
+  }
+
+  public DictionaryPage getDictionaryPage() {
+    return this.dictionaryPage;
+  }
+
+  public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk() {
+    return this.pagesInChunk;
+  }
+
+  void readAllRemainingPagesAsync() {
+    readFutures.offer(ParquetFileReader.processThreadPool.submit(new 
FilePageReaderTask(this)));
+  }
+
+  void readAllRemainingPages() throws IOException {
+    while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
+      readOnePage();
+    }
+    if (chunk.offsetIndex == null
+      && valuesCountReadSoFar != 
chunk.getDescriptor().getMetadata().getValueCount()) {
+      // Would be nice to have a CorruptParquetFileException or something as a 
subclass?
+      throw new IOException(
+        "Expected " + chunk.getDescriptor().getMetadata().getValueCount()
+          + " values in column chunk at " +
+          parquetFileReader.getPath() + " offset "
+          + chunk.descriptor.getMetadata().getFirstDataPageOffset() +
+          " but got " + valuesCountReadSoFar + " values instead over " + 
pagesInChunk.size()
+          + " pages ending at file offset " + 
(chunk.getDescriptor().getFileOffset()
+          + chunk.stream.position()));
+    }
+    try {
+      pagesInChunk.put(Optional.empty()); // add a marker for end of data
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted while reading page data", e);
+    }
+  }
+
+  void readOnePage() throws IOException {
+    long startRead = System.nanoTime();
+    try {
+      byte[] pageHeaderAAD = dataPageHeaderAAD;
+      if (null != headerBlockDecryptor) {
+        // Important: this verifies file integrity (makes sure dictionary page 
had not been removed)
+        if (null == dictionaryPage && 
chunk.getDescriptor().getMetadata().hasDictionaryPage()) {
+          pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPageHeader,
+            rowGroupOrdinal, columnOrdinal, -1);
+        } else {
+          int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar);
+          AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+        }
+      }
+      PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, 
pageHeaderAAD);
+      int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+      int compressedPageSize = pageHeader.getCompressed_page_size();
+      final BytesInput pageBytes;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          // there is only one dictionary page per column chunk
+          if (dictionaryPage != null) {
+            throw new ParquetDecodingException(
+              "more than one dictionary page in column " + 
chunk.getDescriptor().getCol());
+          }
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);
+          if (parquetFileReader.options.usePageChecksumVerification() && 
pageHeader.isSetCrc()) {
+            chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+              "could not verify dictionary page integrity, CRC checksum 
verification failed");
+          }
+          DictionaryPageHeader dicHeader = 
pageHeader.getDictionary_page_header();
+          DictionaryPage compressedDictionaryPage =
+            new DictionaryPage(
+              pageBytes,
+              uncompressedPageSize,
+              dicHeader.getNum_values(),
+              parquetFileReader.converter.getEncoding(dicHeader.getEncoding())
+            );
+          // Copy crc to new page, used for testing
+          if (pageHeader.isSetCrc()) {
+            compressedDictionaryPage.setCrc(pageHeader.getCrc());
+          }
+          dictionaryPage = compressedDictionaryPage;
+          break;
+        case DATA_PAGE:
+          DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);

Review Comment:
   In the current implementation, tasks of `ioThreadPool` are submitted first 
and then tasks of `processThreadPool` get submitted. If we merge these two 
thread pools into a single one, problem will happen due to uncertainty of the 
execution order in the ExecutorService. When the tasks that are previously 
submitted to `processThreadPool` are running first and have exhausted all the 
threads in the pool, they will be unable to proceed because none of the I/O 
tasks manage to get an available thread. Please correct me if I am wrong. 
Thanks! @parthchandra 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java:
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import 
org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.BlockCipher.Decryptor;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.hadoop.ParquetFileReader.Chunk;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the reading of a single page.
+ */
+public class FilePageReader implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FilePageReader.class);
+
+  private final ParquetFileReader parquetFileReader;
+  private final Chunk chunk;
+  private final int currentBlock;
+  private final BlockCipher.Decryptor headerBlockDecryptor;
+  private final BlockCipher.Decryptor pageBlockDecryptor;
+  private final byte[] aadPrefix;
+  private final int rowGroupOrdinal;
+  private final int columnOrdinal;
+
+  //state
+  private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new 
LinkedBlockingDeque<>();
+  private DictionaryPage dictionaryPage = null;
+  private int pageIndex = 0;
+  private long valuesCountReadSoFar = 0;
+  private int dataPageCountReadSoFar = 0;
+
+  // derived
+  private final PrimitiveType type;
+  private final byte[] dataPageAAD;
+  private byte[] dataPageHeaderAAD = null;
+
+  private final BytesInputDecompressor decompressor;
+
+  private final ConcurrentLinkedQueue<Future<Void>> readFutures = new 
ConcurrentLinkedQueue<>();
+
+  private final LongAdder totalTimeReadOnePage = new LongAdder();
+  private final LongAdder totalCountReadOnePage = new LongAdder();
+  private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+  private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+  private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+  private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+  public FilePageReader(ParquetFileReader parquetFileReader, Chunk chunk, int 
currentBlock,
+    Decryptor headerBlockDecryptor,
+    Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+    BytesInputDecompressor decompressor
+  ) {
+    this.parquetFileReader = parquetFileReader;
+    this.chunk = chunk;
+    this.currentBlock = currentBlock;
+    this.headerBlockDecryptor = headerBlockDecryptor;
+    this.pageBlockDecryptor = pageBlockDecryptor;
+    this.aadPrefix = aadPrefix;
+    this.rowGroupOrdinal = rowGroupOrdinal;
+    this.columnOrdinal = columnOrdinal;
+    this.decompressor = decompressor;
+
+    this.type = parquetFileReader.getFileMetaData().getSchema()
+      .getType(chunk.getDescriptor().getCol().getPath()).asPrimitiveType();
+
+    if (null != headerBlockDecryptor) {
+      dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+        rowGroupOrdinal,
+        columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+    }
+    if (null != pageBlockDecryptor) {
+      dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, 
rowGroupOrdinal,
+        columnOrdinal, 0);
+    } else {
+      dataPageAAD = null;
+    }
+  }
+
+  public DictionaryPage getDictionaryPage() {
+    return this.dictionaryPage;
+  }
+
+  public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk() {
+    return this.pagesInChunk;
+  }
+
+  void readAllRemainingPagesAsync() {
+    readFutures.offer(ParquetFileReader.processThreadPool.submit(new 
FilePageReaderTask(this)));
+  }
+
+  void readAllRemainingPages() throws IOException {
+    while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
+      readOnePage();
+    }
+    if (chunk.offsetIndex == null
+      && valuesCountReadSoFar != 
chunk.getDescriptor().getMetadata().getValueCount()) {
+      // Would be nice to have a CorruptParquetFileException or something as a 
subclass?
+      throw new IOException(
+        "Expected " + chunk.getDescriptor().getMetadata().getValueCount()
+          + " values in column chunk at " +
+          parquetFileReader.getPath() + " offset "
+          + chunk.descriptor.getMetadata().getFirstDataPageOffset() +
+          " but got " + valuesCountReadSoFar + " values instead over " + 
pagesInChunk.size()
+          + " pages ending at file offset " + 
(chunk.getDescriptor().getFileOffset()
+          + chunk.stream.position()));
+    }
+    try {
+      pagesInChunk.put(Optional.empty()); // add a marker for end of data
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted while reading page data", e);
+    }
+  }
+
+  void readOnePage() throws IOException {
+    long startRead = System.nanoTime();
+    try {
+      byte[] pageHeaderAAD = dataPageHeaderAAD;
+      if (null != headerBlockDecryptor) {
+        // Important: this verifies file integrity (makes sure dictionary page 
had not been removed)
+        if (null == dictionaryPage && 
chunk.getDescriptor().getMetadata().hasDictionaryPage()) {
+          pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPageHeader,
+            rowGroupOrdinal, columnOrdinal, -1);
+        } else {
+          int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar);
+          AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+        }
+      }
+      PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, 
pageHeaderAAD);
+      int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+      int compressedPageSize = pageHeader.getCompressed_page_size();
+      final BytesInput pageBytes;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          // there is only one dictionary page per column chunk
+          if (dictionaryPage != null) {
+            throw new ParquetDecodingException(
+              "more than one dictionary page in column " + 
chunk.getDescriptor().getCol());
+          }
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);
+          if (parquetFileReader.options.usePageChecksumVerification() && 
pageHeader.isSetCrc()) {
+            chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+              "could not verify dictionary page integrity, CRC checksum 
verification failed");
+          }
+          DictionaryPageHeader dicHeader = 
pageHeader.getDictionary_page_header();
+          DictionaryPage compressedDictionaryPage =
+            new DictionaryPage(
+              pageBytes,
+              uncompressedPageSize,
+              dicHeader.getNum_values(),
+              parquetFileReader.converter.getEncoding(dicHeader.getEncoding())
+            );
+          // Copy crc to new page, used for testing
+          if (pageHeader.isSetCrc()) {
+            compressedDictionaryPage.setCrc(pageHeader.getCrc());
+          }
+          dictionaryPage = compressedDictionaryPage;
+          break;
+        case DATA_PAGE:
+          DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);

Review Comment:
   IIUC, the task in the `processThreadPool` is blocked here to wait for the 
completion of task in the `ioThreadPool`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to