ggershinsky commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r903693351
########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx public long endPos() { return offset + length; } + + @Override + public String toString() { + return "ConsecutivePartList{" + + "offset=" + offset + + ", length=" + length + + ", chunks=" + chunks + + '}'; + } } + + /** + * Encapsulates the reading of a single page. + */ + public class PageReader implements Closeable { + private final Chunk chunk; + private final int currentBlock; + private final BlockCipher.Decryptor headerBlockDecryptor; + private final BlockCipher.Decryptor pageBlockDecryptor; + private final byte[] aadPrefix; + private final int rowGroupOrdinal; + private final int columnOrdinal; + + //state + private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new LinkedBlockingDeque<>(); + private DictionaryPage dictionaryPage = null; + private int pageIndex = 0; + private long valuesCountReadSoFar = 0; + private int dataPageCountReadSoFar = 0; + + // derived + private final PrimitiveType type; + private final byte[] dataPageAAD; + private final byte[] dictionaryPageAAD; + private byte[] dataPageHeaderAAD = null; + + private final BytesInputDecompressor decompressor; + + private final ConcurrentLinkedQueue<Future<Void>> readFutures = new ConcurrentLinkedQueue<>(); + + private final LongAdder totalTimeReadOnePage = new LongAdder(); + private final LongAdder totalCountReadOnePage = new LongAdder(); + private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L); + private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder(); + private final LongAdder totalCountBlockedPagesInChunk = new LongAdder(); + private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L); + + public PageReader(Chunk chunk, int currentBlock, Decryptor headerBlockDecryptor, + Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal, + BytesInputDecompressor decompressor + ) { + this.chunk = chunk; + this.currentBlock = currentBlock; + this.headerBlockDecryptor = headerBlockDecryptor; + this.pageBlockDecryptor = pageBlockDecryptor; + this.aadPrefix = aadPrefix; + this.rowGroupOrdinal = rowGroupOrdinal; + this.columnOrdinal = columnOrdinal; + this.decompressor = decompressor; + + this.type = getFileMetaData().getSchema() + .getType(chunk.descriptor.col.getPath()).asPrimitiveType(); + + if (null != headerBlockDecryptor) { + dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, + rowGroupOrdinal, + columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar)); + } + if (null != pageBlockDecryptor) { + dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, rowGroupOrdinal, + columnOrdinal, 0); + dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPage, Review Comment: Yep, the `dictionaryPageAAD` is not necessary here. This is a significant code change, more than just moving the current logic of ```java public ColumnChunkPageReader readAllPages(BlockCipher.Decryptor headerBlockDecryptor, BlockCipher.Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal) ``` I'll have a closer look at the details, but we need a unitest (proposed in my other comment) to make sure decryption works ok with async io and parallel column reading. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org