[ https://issues.apache.org/jira/browse/DRILL-5356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16025435#comment-16025435 ]
ASF GitHub Bot commented on DRILL-5356: --------------------------------------- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/789#discussion_r118591078 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java --- @@ -308,163 +232,50 @@ public FragmentContext getFragmentContext() { } /** - * Returns data type length for a given {@see ColumnDescriptor} and it's corresponding - * {@see SchemaElement}. Neither is enough information alone as the max - * repetition level (indicating if it is an array type) is in the ColumnDescriptor and - * the length of a fixed width field is stored at the schema level. - * - * @return the length if fixed width, else -1 + * Prepare the Parquet reader. First determine the set of columns to read (the schema + * for this read.) Then, create a state object to track the read across calls to + * the reader <tt>next()</tt> method. Finally, create one of three readers to + * read batches depending on whether this scan is for only fixed-width fields, + * contains at least one variable-width field, or is a "mock" scan consisting + * only of null fields (fields in the SELECT clause but not in the Parquet file.) */ - private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) { - if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { - if (column.getMaxRepetitionLevel() > 0) { - return -1; - } - if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { - return se.getType_length() * 8; - } else { - return getTypeLengthInBits(column.getType()); - } - } else { - return -1; - } - } - @SuppressWarnings({ "resource", "unchecked" }) @Override public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException { this.operatorContext = operatorContext; - if (!isStarQuery()) { - columnsFound = new boolean[getColumns().size()]; - nullFilledVectors = new ArrayList<>(); - } - columnStatuses = new ArrayList<>(); - List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns(); - allFieldsFixedLength = true; - ColumnDescriptor column; - ColumnChunkMetaData columnChunkMetaData; - int columnsToScan = 0; - mockRecordsRead = 0; - - MaterializedField field; + schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, isStarQuery() ? null : getColumns()); logger.debug("Reading row group({}) with {} records in file {}.", rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(), hadoopPath.toUri().getPath()); - totalRecordsRead = 0; - - // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below - // store a map from column name to converted types if they are non-null - Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer); - - // loop to add up the length of the fixed width columns and build the schema - for (int i = 0; i < columns.size(); ++i) { - column = columns.get(i); - SchemaElement se = schemaElements.get(column.getPath()[0]); - MajorType mt = ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(), - getDataMode(column), se, fragmentContext.getOptions()); - field = MaterializedField.create(toFieldName(column.getPath()), mt); - if ( ! fieldSelected(field)) { - continue; - } - columnsToScan++; - int dataTypeLength = getDataTypeLength(column, se); - if (dataTypeLength == -1) { - allFieldsFixedLength = false; - } else { - bitWidthAllFixedFields += dataTypeLength; - } - } - - if (columnsToScan != 0 && allFieldsFixedLength) { - recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields, - footer.getBlocks().get(0).getColumns().get(0).getValueCount()), DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH); - } - else { - recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH; - } try { - ValueVector vector; - SchemaElement schemaElement; - final ArrayList<VarLengthColumn<? extends ValueVector>> varLengthColumns = new ArrayList<>(); - // initialize all of the column read status objects - boolean fieldFixedLength; - // the column chunk meta-data is not guaranteed to be in the same order as the columns in the schema - // a map is constructed for fast access to the correct columnChunkMetadata to correspond - // to an element in the schema - Map<String, Integer> columnChunkMetadataPositionsInList = new HashMap<>(); - BlockMetaData rowGroupMetadata = footer.getBlocks().get(rowGroupIndex); - - int colChunkIndex = 0; - for (ColumnChunkMetaData colChunk : rowGroupMetadata.getColumns()) { - columnChunkMetadataPositionsInList.put(Arrays.toString(colChunk.getPath().toArray()), colChunkIndex); - colChunkIndex++; - } - for (int i = 0; i < columns.size(); ++i) { - column = columns.get(i); - columnChunkMetaData = rowGroupMetadata.getColumns().get(columnChunkMetadataPositionsInList.get(Arrays.toString(column.getPath()))); - schemaElement = schemaElements.get(column.getPath()[0]); - MajorType type = ParquetToDrillTypeConverter.toMajorType(column.getType(), schemaElement.getType_length(), - getDataMode(column), schemaElement, fragmentContext.getOptions()); - field = MaterializedField.create(toFieldName(column.getPath()), type); - // the field was not requested to be read - if ( ! fieldSelected(field)) { - continue; - } - - fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY; - vector = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); - if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { - if (column.getMaxRepetitionLevel() > 0) { - final RepeatedValueVector repeatedVector = RepeatedValueVector.class.cast(vector); - ColumnReader<?> dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, - column, columnChunkMetaData, recordsPerBatch, - repeatedVector.getDataVector(), schemaElement); - varLengthColumns.add(new FixedWidthRepeatedReader(this, dataReader, - getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, repeatedVector, schemaElement)); - } - else { - - ColumnReader<?> cr = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, - column, columnChunkMetaData, recordsPerBatch, vector, - schemaElement) ; - columnStatuses.add(cr); - } - } else { - // create a reader and add it to the appropriate list - varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, column, columnChunkMetaData, false, vector, schemaElement)); - } - } - varLengthReader = new VarLenBinaryReader(this, varLengthColumns); - - if (!isStarQuery()) { - List<SchemaPath> projectedColumns = Lists.newArrayList(getColumns()); - SchemaPath col; - for (int i = 0; i < columnsFound.length; i++) { - col = projectedColumns.get(i); - assert col!=null; - if ( ! columnsFound[i] && !col.equals(STAR_COLUMN)) { - nullFilledVectors.add((NullableIntVector)output.addField(MaterializedField.create(col.getAsUnescapedPath(), - Types.optional(TypeProtos.MinorType.INT)), - (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(TypeProtos.MinorType.INT, DataMode.OPTIONAL))); - - } - } - } + schema.buildSchema(footer, batchSize); --- End diff -- Nice improvement. Thanks! > Refactor Parquet Record Reader > ------------------------------ > > Key: DRILL-5356 > URL: https://issues.apache.org/jira/browse/DRILL-5356 > Project: Apache Drill > Issue Type: Improvement > Affects Versions: 1.10.0, 1.11.0 > Reporter: Paul Rogers > Assignee: Paul Rogers > Priority: Minor > Labels: ready-to-commit > Fix For: 1.11.0 > > > The Parquet record reader class is a key part of Drill that has evolved over > time to become somewhat hard to follow. > A number of us are working on Parquet-related tasks and find we have to spend > an uncomfortable amount of time trying to understand the code. In particular, > this writer needs to figure out how to convince the reader to provide > higher-density record batches. > Rather than continue to decypher the complex code multiple times, this ticket > requests to refactor the code to make it functionally identical, but > structurally cleaner. The result will be faster time to value when working > with this code. > This is a lower-priority change and will be coordinated with others working > on this code base. This ticket is only for the record reader class itself; it > does not include the various readers and writers that Parquet uses since > another project is actively modifying those classes. -- This message was sent by Atlassian JIRA (v6.3.15#6346)