Github user ppadma commented on a diff in the pull request:
https://github.com/apache/drill/pull/789#discussion_r110277141
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
---
@@ -308,163 +232,50 @@ public FragmentContext getFragmentContext() {
}
/**
- * Returns data type length for a given {@see ColumnDescriptor} and it's
corresponding
- * {@see SchemaElement}. Neither is enough information alone as the max
- * repetition level (indicating if it is an array type) is in the
ColumnDescriptor and
- * the length of a fixed width field is stored at the schema level.
- *
- * @return the length if fixed width, else -1
+ * Prepare the Parquet reader. First determine the set of columns to
read (the schema
+ * for this read.) Then, create a state object to track the read across
calls to
+ * the reader <tt>next()</tt> method. Finally, create one of three
readers to
+ * read batches depending on whether this scan is for only fixed-width
fields,
+ * contains at least one variable-width field, or is a "mock" scan
consisting
+ * only of null fields (fields in the SELECT clause but not in the
Parquet file.)
*/
- private int getDataTypeLength(ColumnDescriptor column, SchemaElement se)
{
- if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
- if (column.getMaxRepetitionLevel() > 0) {
- return -1;
- }
- if (column.getType() ==
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
- return se.getType_length() * 8;
- } else {
- return getTypeLengthInBits(column.getType());
- }
- } else {
- return -1;
- }
- }
- @SuppressWarnings({ "resource", "unchecked" })
@Override
public void setup(OperatorContext operatorContext, OutputMutator output)
throws ExecutionSetupException {
this.operatorContext = operatorContext;
- if (!isStarQuery()) {
- columnsFound = new boolean[getColumns().size()];
- nullFilledVectors = new ArrayList<>();
- }
- columnStatuses = new ArrayList<>();
- List<ColumnDescriptor> columns =
footer.getFileMetaData().getSchema().getColumns();
- allFieldsFixedLength = true;
- ColumnDescriptor column;
- ColumnChunkMetaData columnChunkMetaData;
- int columnsToScan = 0;
- mockRecordsRead = 0;
-
- MaterializedField field;
+ schema = new ParquetSchema(fragmentContext.getOptions(),
rowGroupIndex, isStarQuery() ? null : getColumns());
logger.debug("Reading row group({}) with {} records in file {}.",
rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(),
hadoopPath.toUri().getPath());
- totalRecordsRead = 0;
-
- // TODO - figure out how to deal with this better once we add nested
reading, note also look where this map is used below
- // store a map from column name to converted types if they are non-null
- Map<String, SchemaElement> schemaElements =
ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
-
- // loop to add up the length of the fixed width columns and build the
schema
- for (int i = 0; i < columns.size(); ++i) {
- column = columns.get(i);
- SchemaElement se = schemaElements.get(column.getPath()[0]);
- MajorType mt =
ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(),
- getDataMode(column), se, fragmentContext.getOptions());
- field = MaterializedField.create(toFieldName(column.getPath()), mt);
- if ( ! fieldSelected(field)) {
- continue;
- }
- columnsToScan++;
- int dataTypeLength = getDataTypeLength(column, se);
- if (dataTypeLength == -1) {
- allFieldsFixedLength = false;
- } else {
- bitWidthAllFixedFields += dataTypeLength;
- }
- }
-
- if (columnsToScan != 0 && allFieldsFixedLength) {
- recordsPerBatch = (int) Math.min(Math.min(batchSize /
bitWidthAllFixedFields,
- footer.getBlocks().get(0).getColumns().get(0).getValueCount()),
DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
- }
- else {
- recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
- }
try {
- ValueVector vector;
- SchemaElement schemaElement;
- final ArrayList<VarLengthColumn<? extends ValueVector>>
varLengthColumns = new ArrayList<>();
- // initialize all of the column read status objects
- boolean fieldFixedLength;
- // the column chunk meta-data is not guaranteed to be in the same
order as the columns in the schema
- // a map is constructed for fast access to the correct
columnChunkMetadata to correspond
- // to an element in the schema
- Map<String, Integer> columnChunkMetadataPositionsInList = new
HashMap<>();
- BlockMetaData rowGroupMetadata =
footer.getBlocks().get(rowGroupIndex);
-
- int colChunkIndex = 0;
- for (ColumnChunkMetaData colChunk : rowGroupMetadata.getColumns()) {
-
columnChunkMetadataPositionsInList.put(Arrays.toString(colChunk.getPath().toArray()),
colChunkIndex);
- colChunkIndex++;
- }
- for (int i = 0; i < columns.size(); ++i) {
- column = columns.get(i);
- columnChunkMetaData =
rowGroupMetadata.getColumns().get(columnChunkMetadataPositionsInList.get(Arrays.toString(column.getPath())));
- schemaElement = schemaElements.get(column.getPath()[0]);
- MajorType type =
ParquetToDrillTypeConverter.toMajorType(column.getType(),
schemaElement.getType_length(),
- getDataMode(column), schemaElement,
fragmentContext.getOptions());
- field = MaterializedField.create(toFieldName(column.getPath()),
type);
- // the field was not requested to be read
- if ( ! fieldSelected(field)) {
- continue;
- }
-
- fieldFixedLength = column.getType() !=
PrimitiveType.PrimitiveTypeName.BINARY;
- vector = output.addField(field, (Class<? extends ValueVector>)
TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
- if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
- if (column.getMaxRepetitionLevel() > 0) {
- final RepeatedValueVector repeatedVector =
RepeatedValueVector.class.cast(vector);
- ColumnReader<?> dataReader =
ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
- column, columnChunkMetaData, recordsPerBatch,
- repeatedVector.getDataVector(), schemaElement);
- varLengthColumns.add(new FixedWidthRepeatedReader(this,
dataReader,
- getTypeLengthInBits(column.getType()), -1, column,
columnChunkMetaData, false, repeatedVector, schemaElement));
- }
- else {
-
- ColumnReader<?> cr =
ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
- column, columnChunkMetaData, recordsPerBatch, vector,
- schemaElement) ;
- columnStatuses.add(cr);
- }
- } else {
- // create a reader and add it to the appropriate list
- varLengthColumns.add(ColumnReaderFactory.getReader(this, -1,
column, columnChunkMetaData, false, vector, schemaElement));
- }
- }
- varLengthReader = new VarLenBinaryReader(this, varLengthColumns);
-
- if (!isStarQuery()) {
- List<SchemaPath> projectedColumns =
Lists.newArrayList(getColumns());
- SchemaPath col;
- for (int i = 0; i < columnsFound.length; i++) {
- col = projectedColumns.get(i);
- assert col!=null;
- if ( ! columnsFound[i] && !col.equals(STAR_COLUMN)) {
-
nullFilledVectors.add((NullableIntVector)output.addField(MaterializedField.create(col.getAsUnescapedPath(),
- Types.optional(TypeProtos.MinorType.INT)),
- (Class<? extends ValueVector>)
TypeHelper.getValueVectorClass(TypeProtos.MinorType.INT, DataMode.OPTIONAL)));
-
- }
- }
- }
+ schema.buildSchema(footer, batchSize);
--- End diff --
May be pass footer in the constructor of ParquetSchema itself ?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---