[ 
https://issues.apache.org/jira/browse/DRILL-5356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15947292#comment-15947292
 ] 

ASF GitHub Bot commented on DRILL-5356:
---------------------------------------

Github user ppadma commented on a diff in the pull request:

    https://github.com/apache/drill/pull/789#discussion_r108692365
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
 ---
    @@ -0,0 +1,157 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.parquet.columnreaders;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.parquet.ParquetReaderStats;
    +import org.apache.drill.exec.vector.NullableIntVector;
    +import org.apache.drill.exec.vector.ValueVector;
    +import org.apache.parquet.column.ColumnDescriptor;
    +import org.apache.parquet.hadoop.metadata.BlockMetaData;
    +
    +/**
    + * Internal state for reading from a Parquet file.
    + */
    +
    +public class ReadState {
    +  private final ParquetSchema schema;
    +  ParquetReaderStats parquetReaderStats;
    +  private VarLenBinaryReader varLengthReader;
    +  // For columns not found in the file, we need to return a schema element 
with the correct number of values
    +  // at that position in the schema. Currently this requires a vector be 
present. Here is a list of all of these vectors
    +  // that need only have their value count set at the end of each call to 
next(), as the values default to null.
    +  private List<NullableIntVector> nullFilledVectors;
    +  // Keeps track of the number of records returned in the case where only 
columns outside of the file were selected.
    +  // No actual data needs to be read out of the file, we only need to 
return batches until we have 'read' the number of
    +  // records specified in the row group metadata
    +  long mockRecordsRead;
    +  private List<ColumnReader<?>> columnStatuses = new ArrayList<>();
    +  private long numRecordsToRead; // number of records to read
    +  private long totalRecordsRead;
    +  boolean useAsyncColReader;
    +
    +  public ReadState(ParquetSchema schema, ParquetReaderStats 
parquetReaderStats, long numRecordsToRead, boolean useAsyncColReader) {
    +    this.schema = schema;
    +    this.parquetReaderStats = parquetReaderStats;
    +    this.useAsyncColReader = useAsyncColReader;
    +    if (! schema.isStarQuery()) {
    +      nullFilledVectors = new ArrayList<>();
    +    }
    +    mockRecordsRead = 0;
    +    // Callers can pass -1 if they want to read all rows.
    +    if (numRecordsToRead == 
ParquetRecordReader.NUM_RECORDS_TO_READ_NOT_SPECIFIED) {
    +      this.numRecordsToRead = schema.rowCount();
    +    } else {
    +      assert (numRecordsToRead >= 0);
    +      this.numRecordsToRead = Math.min(numRecordsToRead, 
schema.rowCount());
    +    }
    +  }
    +
    +  @SuppressWarnings("unchecked")
    +  public void buildReader(ParquetRecordReader reader, OutputMutator 
output) throws Exception {
    +    final ArrayList<VarLengthColumn<? extends ValueVector>> 
varLengthColumns = new ArrayList<>();
    +    // initialize all of the column read status objects
    +    BlockMetaData rowGroupMetadata = schema.getRowGroupMetadata();
    +    Map<String, Integer> columnChunkMetadataPositionsInList = 
schema.buildChunkMap(rowGroupMetadata);
    +    for (ParquetColumnMetadata colMd : schema.getColumnMetadata()) {
    +      ColumnDescriptor column = colMd.column;
    +      colMd.columnChunkMetaData = rowGroupMetadata.getColumns().get(
    +                      
columnChunkMetadataPositionsInList.get(Arrays.toString(column.getPath())));
    +      colMd.buildVector(output);
    +      if (! colMd.isFixedLength( )) {
    +        // create a reader and add it to the appropriate list
    +        varLengthColumns.add(colMd.makeVariableWidthReader(reader));
    +      } else if (colMd.isRepeated()) {
    +        varLengthColumns.add(colMd.makeRepeatedFixedWidthReader(reader, 
schema.getRecordsPerBatch()));
    +      }
    +      else {
    +        columnStatuses.add(colMd.makeFixedWidthReader(reader, 
schema.getRecordsPerBatch()));
    +      }
    +    }
    +    varLengthReader = new VarLenBinaryReader(reader, varLengthColumns);
    +    if (! schema.isStarQuery()) {
    +      schema.createNonExistentColumns(output, nullFilledVectors);
    +    }
    +  }
    +
    +  public ColumnReader<?> getFirstColumnStatus() {
    +    if (columnStatuses.size() > 0) {
    +      return columnStatuses.get(0);
    +    }
    +    else if (varLengthReader.columns.size() > 0) {
    +      return varLengthReader.columns.get(0);
    +    } else {
    +      return null;
    +    }
    +  }
    +
    +  public void resetBatch() {
    +    for (final ColumnReader<?> column : columnStatuses) {
    +      column.valuesReadInCurrentPass = 0;
    +    }
    +    for (final VarLengthColumn<?> r : varLengthReader.columns) {
    +      r.valuesReadInCurrentPass = 0;
    +    }
    +  }
    +
    +  public ParquetSchema schema() { return schema; }
    +  public List<ColumnReader<?>> getReaders() { return columnStatuses; }
    --- End diff --
    
    for clarity, should we rename this function getColumnReaders ?


> Refactor Parquet Record Reader
> ------------------------------
>
>                 Key: DRILL-5356
>                 URL: https://issues.apache.org/jira/browse/DRILL-5356
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.10.0, 1.11.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>            Priority: Minor
>             Fix For: 1.11.0
>
>
> The Parquet record reader class is a key part of Drill that has evolved over 
> time to become somewhat hard to follow.
> A number of us are working on Parquet-related tasks and find we have to spend 
> an uncomfortable amount of time trying to understand the code. In particular, 
> this writer needs to figure out how to convince the reader to provide 
> higher-density record batches.
> Rather than continue to decypher the complex code multiple times, this ticket 
> requests to refactor the code to make it functionally identical, but 
> structurally cleaner. The result will be faster time to value when working 
> with this code.
> This is a lower-priority change and will be coordinated with others working 
> on this code base. This ticket is only for the record reader class itself; it 
> does not include the various readers and writers that Parquet uses since 
> another project is actively modifying those classes.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to