[ https://issues.apache.org/jira/browse/DRILL-5356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15947292#comment-15947292 ]
ASF GitHub Bot commented on DRILL-5356: --------------------------------------- Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/789#discussion_r108692365 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.store.parquet.ParquetReaderStats; +import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.metadata.BlockMetaData; + +/** + * Internal state for reading from a Parquet file. + */ + +public class ReadState { + private final ParquetSchema schema; + ParquetReaderStats parquetReaderStats; + private VarLenBinaryReader varLengthReader; + // For columns not found in the file, we need to return a schema element with the correct number of values + // at that position in the schema. Currently this requires a vector be present. Here is a list of all of these vectors + // that need only have their value count set at the end of each call to next(), as the values default to null. + private List<NullableIntVector> nullFilledVectors; + // Keeps track of the number of records returned in the case where only columns outside of the file were selected. + // No actual data needs to be read out of the file, we only need to return batches until we have 'read' the number of + // records specified in the row group metadata + long mockRecordsRead; + private List<ColumnReader<?>> columnStatuses = new ArrayList<>(); + private long numRecordsToRead; // number of records to read + private long totalRecordsRead; + boolean useAsyncColReader; + + public ReadState(ParquetSchema schema, ParquetReaderStats parquetReaderStats, long numRecordsToRead, boolean useAsyncColReader) { + this.schema = schema; + this.parquetReaderStats = parquetReaderStats; + this.useAsyncColReader = useAsyncColReader; + if (! schema.isStarQuery()) { + nullFilledVectors = new ArrayList<>(); + } + mockRecordsRead = 0; + // Callers can pass -1 if they want to read all rows. + if (numRecordsToRead == ParquetRecordReader.NUM_RECORDS_TO_READ_NOT_SPECIFIED) { + this.numRecordsToRead = schema.rowCount(); + } else { + assert (numRecordsToRead >= 0); + this.numRecordsToRead = Math.min(numRecordsToRead, schema.rowCount()); + } + } + + @SuppressWarnings("unchecked") + public void buildReader(ParquetRecordReader reader, OutputMutator output) throws Exception { + final ArrayList<VarLengthColumn<? extends ValueVector>> varLengthColumns = new ArrayList<>(); + // initialize all of the column read status objects + BlockMetaData rowGroupMetadata = schema.getRowGroupMetadata(); + Map<String, Integer> columnChunkMetadataPositionsInList = schema.buildChunkMap(rowGroupMetadata); + for (ParquetColumnMetadata colMd : schema.getColumnMetadata()) { + ColumnDescriptor column = colMd.column; + colMd.columnChunkMetaData = rowGroupMetadata.getColumns().get( + columnChunkMetadataPositionsInList.get(Arrays.toString(column.getPath()))); + colMd.buildVector(output); + if (! colMd.isFixedLength( )) { + // create a reader and add it to the appropriate list + varLengthColumns.add(colMd.makeVariableWidthReader(reader)); + } else if (colMd.isRepeated()) { + varLengthColumns.add(colMd.makeRepeatedFixedWidthReader(reader, schema.getRecordsPerBatch())); + } + else { + columnStatuses.add(colMd.makeFixedWidthReader(reader, schema.getRecordsPerBatch())); + } + } + varLengthReader = new VarLenBinaryReader(reader, varLengthColumns); + if (! schema.isStarQuery()) { + schema.createNonExistentColumns(output, nullFilledVectors); + } + } + + public ColumnReader<?> getFirstColumnStatus() { + if (columnStatuses.size() > 0) { + return columnStatuses.get(0); + } + else if (varLengthReader.columns.size() > 0) { + return varLengthReader.columns.get(0); + } else { + return null; + } + } + + public void resetBatch() { + for (final ColumnReader<?> column : columnStatuses) { + column.valuesReadInCurrentPass = 0; + } + for (final VarLengthColumn<?> r : varLengthReader.columns) { + r.valuesReadInCurrentPass = 0; + } + } + + public ParquetSchema schema() { return schema; } + public List<ColumnReader<?>> getReaders() { return columnStatuses; } --- End diff -- for clarity, should we rename this function getColumnReaders ? > Refactor Parquet Record Reader > ------------------------------ > > Key: DRILL-5356 > URL: https://issues.apache.org/jira/browse/DRILL-5356 > Project: Apache Drill > Issue Type: Improvement > Affects Versions: 1.10.0, 1.11.0 > Reporter: Paul Rogers > Assignee: Paul Rogers > Priority: Minor > Fix For: 1.11.0 > > > The Parquet record reader class is a key part of Drill that has evolved over > time to become somewhat hard to follow. > A number of us are working on Parquet-related tasks and find we have to spend > an uncomfortable amount of time trying to understand the code. In particular, > this writer needs to figure out how to convince the reader to provide > higher-density record batches. > Rather than continue to decypher the complex code multiple times, this ticket > requests to refactor the code to make it functionally identical, but > structurally cleaner. The result will be faster time to value when working > with this code. > This is a lower-priority change and will be coordinated with others working > on this code base. This ticket is only for the record reader class itself; it > does not include the various readers and writers that Parquet uses since > another project is actively modifying those classes. -- This message was sent by Atlassian JIRA (v6.3.15#6346)