[ 
https://issues.apache.org/jira/browse/DRILL-5356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16025435#comment-16025435
 ] 

ASF GitHub Bot commented on DRILL-5356:
---------------------------------------

Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/789#discussion_r118591078
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 ---
    @@ -308,163 +232,50 @@ public FragmentContext getFragmentContext() {
       }
     
       /**
    -   * Returns data type length for a given {@see ColumnDescriptor} and it's 
corresponding
    -   * {@see SchemaElement}. Neither is enough information alone as the max
    -   * repetition level (indicating if it is an array type) is in the 
ColumnDescriptor and
    -   * the length of a fixed width field is stored at the schema level.
    -   *
    -   * @return the length if fixed width, else -1
    +   * Prepare the Parquet reader. First determine the set of columns to 
read (the schema
    +   * for this read.) Then, create a state object to track the read across 
calls to
    +   * the reader <tt>next()</tt> method. Finally, create one of three 
readers to
    +   * read batches depending on whether this scan is for only fixed-width 
fields,
    +   * contains at least one variable-width field, or is a "mock" scan 
consisting
    +   * only of null fields (fields in the SELECT clause but not in the 
Parquet file.)
        */
    -  private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) 
{
    -    if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
    -      if (column.getMaxRepetitionLevel() > 0) {
    -        return -1;
    -      }
    -      if (column.getType() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
    -        return se.getType_length() * 8;
    -      } else {
    -        return getTypeLengthInBits(column.getType());
    -      }
    -    } else {
    -      return -1;
    -    }
    -  }
     
    -  @SuppressWarnings({ "resource", "unchecked" })
       @Override
       public void setup(OperatorContext operatorContext, OutputMutator output) 
throws ExecutionSetupException {
         this.operatorContext = operatorContext;
    -    if (!isStarQuery()) {
    -      columnsFound = new boolean[getColumns().size()];
    -      nullFilledVectors = new ArrayList<>();
    -    }
    -    columnStatuses = new ArrayList<>();
    -    List<ColumnDescriptor> columns = 
footer.getFileMetaData().getSchema().getColumns();
    -    allFieldsFixedLength = true;
    -    ColumnDescriptor column;
    -    ColumnChunkMetaData columnChunkMetaData;
    -    int columnsToScan = 0;
    -    mockRecordsRead = 0;
    -
    -    MaterializedField field;
    +    schema = new ParquetSchema(fragmentContext.getOptions(), 
rowGroupIndex, isStarQuery() ? null : getColumns());
     
         logger.debug("Reading row group({}) with {} records in file {}.", 
rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(),
             hadoopPath.toUri().getPath());
    -    totalRecordsRead = 0;
    -
    -    // TODO - figure out how to deal with this better once we add nested 
reading, note also look where this map is used below
    -    // store a map from column name to converted types if they are non-null
    -    Map<String, SchemaElement> schemaElements = 
ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
    -
    -    // loop to add up the length of the fixed width columns and build the 
schema
    -    for (int i = 0; i < columns.size(); ++i) {
    -      column = columns.get(i);
    -      SchemaElement se = schemaElements.get(column.getPath()[0]);
    -      MajorType mt = 
ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(),
    -          getDataMode(column), se, fragmentContext.getOptions());
    -      field = MaterializedField.create(toFieldName(column.getPath()), mt);
    -      if ( ! fieldSelected(field)) {
    -        continue;
    -      }
    -      columnsToScan++;
    -      int dataTypeLength = getDataTypeLength(column, se);
    -      if (dataTypeLength == -1) {
    -        allFieldsFixedLength = false;
    -      } else {
    -        bitWidthAllFixedFields += dataTypeLength;
    -      }
    -    }
    -
    -    if (columnsToScan != 0  && allFieldsFixedLength) {
    -      recordsPerBatch = (int) Math.min(Math.min(batchSize / 
bitWidthAllFixedFields,
    -          footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 
DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
    -    }
    -    else {
    -      recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
    -    }
     
         try {
    -      ValueVector vector;
    -      SchemaElement schemaElement;
    -      final ArrayList<VarLengthColumn<? extends ValueVector>> 
varLengthColumns = new ArrayList<>();
    -      // initialize all of the column read status objects
    -      boolean fieldFixedLength;
    -      // the column chunk meta-data is not guaranteed to be in the same 
order as the columns in the schema
    -      // a map is constructed for fast access to the correct 
columnChunkMetadata to correspond
    -      // to an element in the schema
    -      Map<String, Integer> columnChunkMetadataPositionsInList = new 
HashMap<>();
    -      BlockMetaData rowGroupMetadata = 
footer.getBlocks().get(rowGroupIndex);
    -
    -      int colChunkIndex = 0;
    -      for (ColumnChunkMetaData colChunk : rowGroupMetadata.getColumns()) {
    -        
columnChunkMetadataPositionsInList.put(Arrays.toString(colChunk.getPath().toArray()),
 colChunkIndex);
    -        colChunkIndex++;
    -      }
    -      for (int i = 0; i < columns.size(); ++i) {
    -        column = columns.get(i);
    -        columnChunkMetaData = 
rowGroupMetadata.getColumns().get(columnChunkMetadataPositionsInList.get(Arrays.toString(column.getPath())));
    -        schemaElement = schemaElements.get(column.getPath()[0]);
    -        MajorType type = 
ParquetToDrillTypeConverter.toMajorType(column.getType(), 
schemaElement.getType_length(),
    -            getDataMode(column), schemaElement, 
fragmentContext.getOptions());
    -        field = MaterializedField.create(toFieldName(column.getPath()), 
type);
    -        // the field was not requested to be read
    -        if ( ! fieldSelected(field)) {
    -          continue;
    -        }
    -
    -        fieldFixedLength = column.getType() != 
PrimitiveType.PrimitiveTypeName.BINARY;
    -        vector = output.addField(field, (Class<? extends ValueVector>) 
TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
    -        if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
    -          if (column.getMaxRepetitionLevel() > 0) {
    -            final RepeatedValueVector repeatedVector = 
RepeatedValueVector.class.cast(vector);
    -            ColumnReader<?> dataReader = 
ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
    -                column, columnChunkMetaData, recordsPerBatch,
    -                repeatedVector.getDataVector(), schemaElement);
    -            varLengthColumns.add(new FixedWidthRepeatedReader(this, 
dataReader,
    -                getTypeLengthInBits(column.getType()), -1, column, 
columnChunkMetaData, false, repeatedVector, schemaElement));
    -          }
    -          else {
    -
    -           ColumnReader<?> cr = 
ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
    -                column, columnChunkMetaData, recordsPerBatch, vector,
    -                schemaElement) ;
    -            columnStatuses.add(cr);
    -          }
    -        } else {
    -          // create a reader and add it to the appropriate list
    -          varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, 
column, columnChunkMetaData, false, vector, schemaElement));
    -        }
    -      }
    -      varLengthReader = new VarLenBinaryReader(this, varLengthColumns);
    -
    -      if (!isStarQuery()) {
    -        List<SchemaPath> projectedColumns = 
Lists.newArrayList(getColumns());
    -        SchemaPath col;
    -        for (int i = 0; i < columnsFound.length; i++) {
    -          col = projectedColumns.get(i);
    -          assert col!=null;
    -          if ( ! columnsFound[i] && !col.equals(STAR_COLUMN)) {
    -            
nullFilledVectors.add((NullableIntVector)output.addField(MaterializedField.create(col.getAsUnescapedPath(),
    -                    Types.optional(TypeProtos.MinorType.INT)),
    -                (Class<? extends ValueVector>) 
TypeHelper.getValueVectorClass(TypeProtos.MinorType.INT, DataMode.OPTIONAL)));
    -
    -          }
    -        }
    -      }
    +      schema.buildSchema(footer, batchSize);
    --- End diff --
    
    Nice improvement. Thanks!


> Refactor Parquet Record Reader
> ------------------------------
>
>                 Key: DRILL-5356
>                 URL: https://issues.apache.org/jira/browse/DRILL-5356
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.10.0, 1.11.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>            Priority: Minor
>              Labels: ready-to-commit
>             Fix For: 1.11.0
>
>
> The Parquet record reader class is a key part of Drill that has evolved over 
> time to become somewhat hard to follow.
> A number of us are working on Parquet-related tasks and find we have to spend 
> an uncomfortable amount of time trying to understand the code. In particular, 
> this writer needs to figure out how to convince the reader to provide 
> higher-density record batches.
> Rather than continue to decypher the complex code multiple times, this ticket 
> requests to refactor the code to make it functionally identical, but 
> structurally cleaner. The result will be faster time to value when working 
> with this code.
> This is a lower-priority change and will be coordinated with others working 
> on this code base. This ticket is only for the record reader class itself; it 
> does not include the various readers and writers that Parquet uses since 
> another project is actively modifying those classes.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to