[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17637846#comment-17637846 ]
ASF GitHub Bot commented on PARQUET-2149: ----------------------------------------- wgtmac commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1030605835 ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java: ########## @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; +import org.apache.parquet.format.BlockCipher; +import org.apache.parquet.format.BlockCipher.Decryptor; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.hadoop.ParquetFileReader.Chunk; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates the reading of a single page. + */ +public class FilePageReader implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(FilePageReader.class); + + private final ParquetFileReader parquetFileReader; + private final Chunk chunk; + private final int currentBlock; + private final BlockCipher.Decryptor headerBlockDecryptor; + private final BlockCipher.Decryptor pageBlockDecryptor; + private final byte[] aadPrefix; + private final int rowGroupOrdinal; + private final int columnOrdinal; + + //state + private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new LinkedBlockingDeque<>(); + private DictionaryPage dictionaryPage = null; + private int pageIndex = 0; + private long valuesCountReadSoFar = 0; + private int dataPageCountReadSoFar = 0; + + // derived + private final PrimitiveType type; + private final byte[] dataPageAAD; + private byte[] dataPageHeaderAAD = null; + + private final BytesInputDecompressor decompressor; + + private final ConcurrentLinkedQueue<Future<Void>> readFutures = new ConcurrentLinkedQueue<>(); + + private final LongAdder totalTimeReadOnePage = new LongAdder(); + private final LongAdder totalCountReadOnePage = new LongAdder(); + private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L); + private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder(); + private final LongAdder totalCountBlockedPagesInChunk = new LongAdder(); + private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L); + + public FilePageReader(ParquetFileReader parquetFileReader, Chunk chunk, int currentBlock, + Decryptor headerBlockDecryptor, + Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal, + BytesInputDecompressor decompressor + ) { + this.parquetFileReader = parquetFileReader; + this.chunk = chunk; + this.currentBlock = currentBlock; + this.headerBlockDecryptor = headerBlockDecryptor; + this.pageBlockDecryptor = pageBlockDecryptor; + this.aadPrefix = aadPrefix; + this.rowGroupOrdinal = rowGroupOrdinal; + this.columnOrdinal = columnOrdinal; + this.decompressor = decompressor; + + this.type = parquetFileReader.getFileMetaData().getSchema() + .getType(chunk.getDescriptor().getCol().getPath()).asPrimitiveType(); + + if (null != headerBlockDecryptor) { + dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, + rowGroupOrdinal, + columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar)); + } + if (null != pageBlockDecryptor) { + dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, rowGroupOrdinal, + columnOrdinal, 0); + } else { + dataPageAAD = null; + } + } + + public DictionaryPage getDictionaryPage() { + return this.dictionaryPage; + } + + public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk() { + return this.pagesInChunk; + } + + void readAllRemainingPagesAsync() { + readFutures.offer(ParquetFileReader.processThreadPool.submit(new FilePageReaderTask(this))); + } + + void readAllRemainingPages() throws IOException { + while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) { + readOnePage(); + } + if (chunk.offsetIndex == null + && valuesCountReadSoFar != chunk.getDescriptor().getMetadata().getValueCount()) { + // Would be nice to have a CorruptParquetFileException or something as a subclass? + throw new IOException( + "Expected " + chunk.getDescriptor().getMetadata().getValueCount() + + " values in column chunk at " + + parquetFileReader.getPath() + " offset " + + chunk.descriptor.getMetadata().getFirstDataPageOffset() + + " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size() + + " pages ending at file offset " + (chunk.getDescriptor().getFileOffset() + + chunk.stream.position())); + } + try { + pagesInChunk.put(Optional.empty()); // add a marker for end of data + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while reading page data", e); + } + } + + void readOnePage() throws IOException { + long startRead = System.nanoTime(); + try { + byte[] pageHeaderAAD = dataPageHeaderAAD; + if (null != headerBlockDecryptor) { + // Important: this verifies file integrity (makes sure dictionary page had not been removed) + if (null == dictionaryPage && chunk.getDescriptor().getMetadata().hasDictionaryPage()) { + pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPageHeader, + rowGroupOrdinal, columnOrdinal, -1); + } else { + int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar); + AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal); + } + } + PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, pageHeaderAAD); + int uncompressedPageSize = pageHeader.getUncompressed_page_size(); + int compressedPageSize = pageHeader.getCompressed_page_size(); + final BytesInput pageBytes; + switch (pageHeader.type) { + case DICTIONARY_PAGE: + // there is only one dictionary page per column chunk + if (dictionaryPage != null) { + throw new ParquetDecodingException( + "more than one dictionary page in column " + chunk.getDescriptor().getCol()); + } + pageBytes = chunk.readAsBytesInput(compressedPageSize); + if (parquetFileReader.options.usePageChecksumVerification() && pageHeader.isSetCrc()) { + chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(), + "could not verify dictionary page integrity, CRC checksum verification failed"); + } + DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header(); + DictionaryPage compressedDictionaryPage = + new DictionaryPage( + pageBytes, + uncompressedPageSize, + dicHeader.getNum_values(), + parquetFileReader.converter.getEncoding(dicHeader.getEncoding()) + ); + // Copy crc to new page, used for testing + if (pageHeader.isSetCrc()) { + compressedDictionaryPage.setCrc(pageHeader.getCrc()); + } + dictionaryPage = compressedDictionaryPage; + break; + case DATA_PAGE: + DataPageHeader dataHeaderV1 = pageHeader.getData_page_header(); + pageBytes = chunk.readAsBytesInput(compressedPageSize); Review Comment: `chunk.readAsBytesInput(compressedPageSize)` is running inside `processThreadPool ` and what it does is as below: ``` public BytesInput readAsBytesInput(int size) throws IOException { if (LOG.isDebugEnabled()) { String mode = (isAsyncIOReaderEnabled()) ? "ASYNC" : "SYNC"; LOG.debug("{} READ BYTES INPUT: stream {}", mode, stream); } return BytesInput.from(stream.sliceBuffers(size)); } ``` The `stream.sliceBuffers(size)` is a blocking call to `SequenceByteBufferInputStream.sliceBuffers()` which calls `AsyncMultiBufferInputStream.sliceBuffers()` internally and waits for return of `AsyncMultiBufferInputStream.nextBuffer()`. Please note that `AsyncMultiBufferInputStream.nextBuffer()` is running in the `ioThreadPool `. ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java: ########## @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; +import org.apache.parquet.format.BlockCipher; +import org.apache.parquet.format.BlockCipher.Decryptor; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.hadoop.ParquetFileReader.Chunk; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates the reading of a single page. + */ +public class FilePageReader implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(FilePageReader.class); + + private final ParquetFileReader parquetFileReader; + private final Chunk chunk; + private final int currentBlock; + private final BlockCipher.Decryptor headerBlockDecryptor; + private final BlockCipher.Decryptor pageBlockDecryptor; + private final byte[] aadPrefix; + private final int rowGroupOrdinal; + private final int columnOrdinal; + + //state + private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new LinkedBlockingDeque<>(); + private DictionaryPage dictionaryPage = null; + private int pageIndex = 0; + private long valuesCountReadSoFar = 0; + private int dataPageCountReadSoFar = 0; + + // derived + private final PrimitiveType type; + private final byte[] dataPageAAD; + private byte[] dataPageHeaderAAD = null; + + private final BytesInputDecompressor decompressor; + + private final ConcurrentLinkedQueue<Future<Void>> readFutures = new ConcurrentLinkedQueue<>(); + + private final LongAdder totalTimeReadOnePage = new LongAdder(); + private final LongAdder totalCountReadOnePage = new LongAdder(); + private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L); + private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder(); + private final LongAdder totalCountBlockedPagesInChunk = new LongAdder(); + private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L); + + public FilePageReader(ParquetFileReader parquetFileReader, Chunk chunk, int currentBlock, + Decryptor headerBlockDecryptor, + Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal, + BytesInputDecompressor decompressor + ) { + this.parquetFileReader = parquetFileReader; + this.chunk = chunk; + this.currentBlock = currentBlock; + this.headerBlockDecryptor = headerBlockDecryptor; + this.pageBlockDecryptor = pageBlockDecryptor; + this.aadPrefix = aadPrefix; + this.rowGroupOrdinal = rowGroupOrdinal; + this.columnOrdinal = columnOrdinal; + this.decompressor = decompressor; + + this.type = parquetFileReader.getFileMetaData().getSchema() + .getType(chunk.getDescriptor().getCol().getPath()).asPrimitiveType(); + + if (null != headerBlockDecryptor) { + dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, + rowGroupOrdinal, + columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar)); + } + if (null != pageBlockDecryptor) { + dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, rowGroupOrdinal, + columnOrdinal, 0); + } else { + dataPageAAD = null; + } + } + + public DictionaryPage getDictionaryPage() { + return this.dictionaryPage; + } + + public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk() { + return this.pagesInChunk; + } + + void readAllRemainingPagesAsync() { + readFutures.offer(ParquetFileReader.processThreadPool.submit(new FilePageReaderTask(this))); + } + + void readAllRemainingPages() throws IOException { + while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) { + readOnePage(); + } + if (chunk.offsetIndex == null + && valuesCountReadSoFar != chunk.getDescriptor().getMetadata().getValueCount()) { + // Would be nice to have a CorruptParquetFileException or something as a subclass? + throw new IOException( + "Expected " + chunk.getDescriptor().getMetadata().getValueCount() + + " values in column chunk at " + + parquetFileReader.getPath() + " offset " + + chunk.descriptor.getMetadata().getFirstDataPageOffset() + + " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size() + + " pages ending at file offset " + (chunk.getDescriptor().getFileOffset() + + chunk.stream.position())); + } + try { + pagesInChunk.put(Optional.empty()); // add a marker for end of data + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while reading page data", e); + } + } + + void readOnePage() throws IOException { + long startRead = System.nanoTime(); + try { + byte[] pageHeaderAAD = dataPageHeaderAAD; + if (null != headerBlockDecryptor) { + // Important: this verifies file integrity (makes sure dictionary page had not been removed) + if (null == dictionaryPage && chunk.getDescriptor().getMetadata().hasDictionaryPage()) { + pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPageHeader, + rowGroupOrdinal, columnOrdinal, -1); + } else { + int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar); + AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal); + } + } + PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, pageHeaderAAD); + int uncompressedPageSize = pageHeader.getUncompressed_page_size(); + int compressedPageSize = pageHeader.getCompressed_page_size(); + final BytesInput pageBytes; + switch (pageHeader.type) { + case DICTIONARY_PAGE: + // there is only one dictionary page per column chunk + if (dictionaryPage != null) { + throw new ParquetDecodingException( + "more than one dictionary page in column " + chunk.getDescriptor().getCol()); + } + pageBytes = chunk.readAsBytesInput(compressedPageSize); + if (parquetFileReader.options.usePageChecksumVerification() && pageHeader.isSetCrc()) { + chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(), + "could not verify dictionary page integrity, CRC checksum verification failed"); + } + DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header(); + DictionaryPage compressedDictionaryPage = + new DictionaryPage( + pageBytes, + uncompressedPageSize, + dicHeader.getNum_values(), + parquetFileReader.converter.getEncoding(dicHeader.getEncoding()) + ); + // Copy crc to new page, used for testing + if (pageHeader.isSetCrc()) { + compressedDictionaryPage.setCrc(pageHeader.getCrc()); + } + dictionaryPage = compressedDictionaryPage; + break; + case DATA_PAGE: + DataPageHeader dataHeaderV1 = pageHeader.getData_page_header(); + pageBytes = chunk.readAsBytesInput(compressedPageSize); Review Comment: In the current implementation, tasks of `ioThreadPool` are submitted first and then tasks of `processThreadPool` get submitted. If we merge these two thread pools into a single one, problem will happen due to uncertainty of the execution order in the ExecutorService. When the tasks that are previously submitted to `processThreadPool` are running first and have exhausted all the threads in the pool, they will be unable to proceed because none of the I/O tasks manage to get an available thread. Please correct me if I am wrong. Thanks! @parthchandra ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java: ########## @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; +import org.apache.parquet.format.BlockCipher; +import org.apache.parquet.format.BlockCipher.Decryptor; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.hadoop.ParquetFileReader.Chunk; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates the reading of a single page. + */ +public class FilePageReader implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(FilePageReader.class); + + private final ParquetFileReader parquetFileReader; + private final Chunk chunk; + private final int currentBlock; + private final BlockCipher.Decryptor headerBlockDecryptor; + private final BlockCipher.Decryptor pageBlockDecryptor; + private final byte[] aadPrefix; + private final int rowGroupOrdinal; + private final int columnOrdinal; + + //state + private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new LinkedBlockingDeque<>(); + private DictionaryPage dictionaryPage = null; + private int pageIndex = 0; + private long valuesCountReadSoFar = 0; + private int dataPageCountReadSoFar = 0; + + // derived + private final PrimitiveType type; + private final byte[] dataPageAAD; + private byte[] dataPageHeaderAAD = null; + + private final BytesInputDecompressor decompressor; + + private final ConcurrentLinkedQueue<Future<Void>> readFutures = new ConcurrentLinkedQueue<>(); + + private final LongAdder totalTimeReadOnePage = new LongAdder(); + private final LongAdder totalCountReadOnePage = new LongAdder(); + private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L); + private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder(); + private final LongAdder totalCountBlockedPagesInChunk = new LongAdder(); + private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L); + + public FilePageReader(ParquetFileReader parquetFileReader, Chunk chunk, int currentBlock, + Decryptor headerBlockDecryptor, + Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal, + BytesInputDecompressor decompressor + ) { + this.parquetFileReader = parquetFileReader; + this.chunk = chunk; + this.currentBlock = currentBlock; + this.headerBlockDecryptor = headerBlockDecryptor; + this.pageBlockDecryptor = pageBlockDecryptor; + this.aadPrefix = aadPrefix; + this.rowGroupOrdinal = rowGroupOrdinal; + this.columnOrdinal = columnOrdinal; + this.decompressor = decompressor; + + this.type = parquetFileReader.getFileMetaData().getSchema() + .getType(chunk.getDescriptor().getCol().getPath()).asPrimitiveType(); + + if (null != headerBlockDecryptor) { + dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, + rowGroupOrdinal, + columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar)); + } + if (null != pageBlockDecryptor) { + dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, rowGroupOrdinal, + columnOrdinal, 0); + } else { + dataPageAAD = null; + } + } + + public DictionaryPage getDictionaryPage() { + return this.dictionaryPage; + } + + public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk() { + return this.pagesInChunk; + } + + void readAllRemainingPagesAsync() { + readFutures.offer(ParquetFileReader.processThreadPool.submit(new FilePageReaderTask(this))); + } + + void readAllRemainingPages() throws IOException { + while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) { + readOnePage(); + } + if (chunk.offsetIndex == null + && valuesCountReadSoFar != chunk.getDescriptor().getMetadata().getValueCount()) { + // Would be nice to have a CorruptParquetFileException or something as a subclass? + throw new IOException( + "Expected " + chunk.getDescriptor().getMetadata().getValueCount() + + " values in column chunk at " + + parquetFileReader.getPath() + " offset " + + chunk.descriptor.getMetadata().getFirstDataPageOffset() + + " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size() + + " pages ending at file offset " + (chunk.getDescriptor().getFileOffset() + + chunk.stream.position())); + } + try { + pagesInChunk.put(Optional.empty()); // add a marker for end of data + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while reading page data", e); + } + } + + void readOnePage() throws IOException { + long startRead = System.nanoTime(); + try { + byte[] pageHeaderAAD = dataPageHeaderAAD; + if (null != headerBlockDecryptor) { + // Important: this verifies file integrity (makes sure dictionary page had not been removed) + if (null == dictionaryPage && chunk.getDescriptor().getMetadata().hasDictionaryPage()) { + pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPageHeader, + rowGroupOrdinal, columnOrdinal, -1); + } else { + int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar); + AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal); + } + } + PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, pageHeaderAAD); + int uncompressedPageSize = pageHeader.getUncompressed_page_size(); + int compressedPageSize = pageHeader.getCompressed_page_size(); + final BytesInput pageBytes; + switch (pageHeader.type) { + case DICTIONARY_PAGE: + // there is only one dictionary page per column chunk + if (dictionaryPage != null) { + throw new ParquetDecodingException( + "more than one dictionary page in column " + chunk.getDescriptor().getCol()); + } + pageBytes = chunk.readAsBytesInput(compressedPageSize); + if (parquetFileReader.options.usePageChecksumVerification() && pageHeader.isSetCrc()) { + chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(), + "could not verify dictionary page integrity, CRC checksum verification failed"); + } + DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header(); + DictionaryPage compressedDictionaryPage = + new DictionaryPage( + pageBytes, + uncompressedPageSize, + dicHeader.getNum_values(), + parquetFileReader.converter.getEncoding(dicHeader.getEncoding()) + ); + // Copy crc to new page, used for testing + if (pageHeader.isSetCrc()) { + compressedDictionaryPage.setCrc(pageHeader.getCrc()); + } + dictionaryPage = compressedDictionaryPage; + break; + case DATA_PAGE: + DataPageHeader dataHeaderV1 = pageHeader.getData_page_header(); + pageBytes = chunk.readAsBytesInput(compressedPageSize); Review Comment: IIUC, the task in the `processThreadPool` is blocked here to wait for the completion of task in the `ioThreadPool`. > Implement async IO for Parquet file reader > ------------------------------------------ > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr > Reporter: Parth Chandra > Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)