http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java deleted file mode 100644 index 703ad1f..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java +++ /dev/null @@ -1,711 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.parquet; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.types.TypeProtos; -import org.apache.drill.common.types.TypeProtos.DataMode; -import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.common.types.Types; -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.impl.OutputMutator; -import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.store.RecordReader; -import org.apache.drill.exec.store.parquet.FixedByteAlignedReader.Decimal28Reader; -import org.apache.drill.exec.store.parquet.FixedByteAlignedReader.Decimal38Reader; -import org.apache.drill.exec.store.parquet.NullableFixedByteAlignedReader.NullableDecimal28Reader; -import org.apache.drill.exec.store.parquet.NullableFixedByteAlignedReader.NullableDecimal38Reader; -import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.Decimal28Column; -import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.Decimal38Column; -import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableDecimal28Column; -import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableDecimal38Column; -import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableVarBinaryColumn; -import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableVarCharColumn; -import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableVarLengthColumn; -import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.VarBinaryColumn; -import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.VarCharColumn; -import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.VarLengthColumn; -import org.apache.drill.exec.vector.Decimal28SparseVector; -import org.apache.drill.exec.vector.Decimal38SparseVector; -import org.apache.drill.exec.vector.NullableDecimal28SparseVector; -import org.apache.drill.exec.vector.NullableDecimal38SparseVector; -import org.apache.drill.exec.vector.NullableVarBinaryVector; -import org.apache.drill.exec.vector.NullableVarCharVector; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.VarBinaryVector; -import org.apache.drill.exec.vector.VarCharVector; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import parquet.column.ColumnDescriptor; -import parquet.column.Encoding; -import parquet.format.ConvertedType; -import parquet.format.FileMetaData; -import parquet.format.SchemaElement; -import parquet.format.converter.ParquetMetadataConverter; -import parquet.hadoop.CodecFactoryExposer; -import parquet.hadoop.ParquetFileWriter; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.hadoop.metadata.ParquetMetadata; -import parquet.schema.PrimitiveType; -import parquet.schema.PrimitiveType.PrimitiveTypeName; - -public class ParquetRecordReader implements RecordReader { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class); - - // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors - private static final int NUMBER_OF_VECTORS = 1; - private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb - private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb - private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024; - - // TODO - should probably find a smarter way to set this, currently 1 megabyte - private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 1; - public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 1; - private static final String SEPERATOR = System.getProperty("file.separator"); - - - // used for clearing the last n bits of a byte - public static final byte[] endBitMasks = {-2, -4, -8, -16, -32, -64, -128}; - // used for clearing the first n bits of a byte - public static final byte[] startBitMasks = {127, 63, 31, 15, 7, 3, 1}; - - private int bitWidthAllFixedFields; - private boolean allFieldsFixedLength; - private int recordsPerBatch; - private long totalRecords; - private long rowGroupOffset; - - private List<ColumnReader> columnStatuses; - FileSystem fileSystem; - private long batchSize; - Path hadoopPath; - private VarLenBinaryReader varLengthReader; - private ParquetMetadata footer; - private List<SchemaPath> columns; - - public CodecFactoryExposer getCodecFactoryExposer() { - return codecFactoryExposer; - } - - private final CodecFactoryExposer codecFactoryExposer; - - int rowGroupIndex; - - public ParquetRecordReader(FragmentContext fragmentContext, // - String path, // - int rowGroupIndex, // - FileSystem fs, // - CodecFactoryExposer codecFactoryExposer, // - ParquetMetadata footer, // - List<SchemaPath> columns) throws ExecutionSetupException { - this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactoryExposer, footer, - columns); - } - - public ParquetRecordReader(FragmentContext fragmentContext, long batchSize, - String path, int rowGroupIndex, FileSystem fs, - CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer, - List<SchemaPath> columns) throws ExecutionSetupException { - hadoopPath = new Path(path); - fileSystem = fs; - this.codecFactoryExposer = codecFactoryExposer; - this.rowGroupIndex = rowGroupIndex; - this.batchSize = batchSize; - this.footer = footer; - this.columns = columns; - } - - public int getRowGroupIndex() { - return rowGroupIndex; - } - - public int getBitWidthAllFixedFields() { - return bitWidthAllFixedFields; - } - - public long getBatchSize() { - return batchSize; - } - - /** - * @param type a fixed length type from the parquet library enum - * @return the length in pageDataByteArray of the type - */ - public static int getTypeLengthInBits(PrimitiveType.PrimitiveTypeName type) { - switch (type) { - case INT64: return 64; - case INT32: return 32; - case BOOLEAN: return 1; - case FLOAT: return 32; - case DOUBLE: return 64; - case INT96: return 96; - // binary and fixed length byte array - default: - throw new IllegalStateException("Length cannot be determined for type " + type); - } - } - - private boolean fieldSelected(MaterializedField field){ - // TODO - not sure if this is how we want to represent this - // for now it makes the existing tests pass, simply selecting - // all available data if no columns are provided - if (this.columns != null){ - for (SchemaPath expr : this.columns){ - if ( field.matches(expr)){ - return true; - } - } - return false; - } - return true; - } - - @Override - public void setup(OutputMutator output) throws ExecutionSetupException { - - columnStatuses = new ArrayList<>(); - totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount(); - List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns(); - allFieldsFixedLength = true; - ColumnDescriptor column; - ColumnChunkMetaData columnChunkMetaData; - int columnsToScan = 0; - - MaterializedField field; - ParquetMetadataConverter metaConverter = new ParquetMetadataConverter(); - FileMetaData fileMetaData; - - // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below - // store a map from column name to converted types if they are non-null - HashMap<String, SchemaElement> schemaElements = new HashMap<>(); - fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer); - for (SchemaElement se : fileMetaData.getSchema()) { - schemaElements.put(se.getName(), se); - } - - // loop to add up the length of the fixed width columns and build the schema - for (int i = 0; i < columns.size(); ++i) { - column = columns.get(i); - logger.debug("name: " + fileMetaData.getSchema().get(i).name); - SchemaElement se = schemaElements.get(column.getPath()[0]); - MajorType mt = toMajorType(column.getType(), se.getType_length(), getDataMode(column), se); - field = MaterializedField.create(toFieldName(column.getPath()),mt); - if ( ! fieldSelected(field)){ - continue; - } - columnsToScan++; - // sum the lengths of all of the fixed length fields - if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { - // There is not support for the fixed binary type yet in parquet, leaving a task here as a reminder - // TODO - implement this when the feature is added upstream - if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){ - bitWidthAllFixedFields += se.getType_length() * 8; - } else { - bitWidthAllFixedFields += getTypeLengthInBits(column.getType()); - } - } else { - allFieldsFixedLength = false; - } - } - rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset(); - - // none of the columns in the parquet file matched the request columns from the query - if (columnsToScan == 0){ - throw new ExecutionSetupException("Error reading from parquet file. No columns requested were found in the file."); - } - if (allFieldsFixedLength) { - recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields, - footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 65535); - } - else { - recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH; - } - - try { - ValueVector v; - ConvertedType convertedType; - SchemaElement schemaElement; - ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>(); - ArrayList<NullableVarLengthColumn> nullableVarLengthColumns = new ArrayList<>(); - // initialize all of the column read status objects - boolean fieldFixedLength = false; - for (int i = 0; i < columns.size(); ++i) { - column = columns.get(i); - columnChunkMetaData = footer.getBlocks().get(rowGroupIndex).getColumns().get(i); - schemaElement = schemaElements.get(column.getPath()[0]); - convertedType = schemaElement.getConverted_type(); - MajorType type = toMajorType(column.getType(), schemaElement.getType_length(), getDataMode(column), schemaElement); - field = MaterializedField.create(toFieldName(column.getPath()), type); - // the field was not requested to be read - if ( ! fieldSelected(field)) continue; - - fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY; - v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); - if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { - createFixedColumnReader(fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v, - schemaElement); - } else { - // create a reader and add it to the appropriate list - getReader(this, -1, column, columnChunkMetaData, false, v, schemaElement, varLengthColumns, nullableVarLengthColumns); - } - } - varLengthReader = new VarLenBinaryReader(this, varLengthColumns, nullableVarLengthColumns); - } catch (SchemaChangeException e) { - throw new ExecutionSetupException(e); - } - } - - private SchemaPath toFieldName(String[] paths) { - return SchemaPath.getCompoundPath(paths); - } - - private TypeProtos.DataMode getDataMode(ColumnDescriptor column) { - if (column.getMaxDefinitionLevel() == 0) { - return TypeProtos.DataMode.REQUIRED; - } else { - return TypeProtos.DataMode.OPTIONAL; - } - } - - private void resetBatch() { - for (ColumnReader column : columnStatuses) { - column.valuesReadInCurrentPass = 0; - } - for (VarLengthColumn r : varLengthReader.columns){ - r.valuesReadInCurrentPass = 0; - } - for (NullableVarLengthColumn r : varLengthReader.nullableColumns){ - r.valuesReadInCurrentPass = 0; - } - } - - /** - * @param fixedLength - * @param descriptor - * @param columnChunkMetaData - * @param allocateSize - the size of the vector to create - * @return - * @throws SchemaChangeException - */ - private boolean createFixedColumnReader(boolean fixedLength, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v, - SchemaElement schemaElement) - throws SchemaChangeException, ExecutionSetupException { - ConvertedType convertedType = schemaElement.getConverted_type(); - // if the column is required - if (descriptor.getMaxDefinitionLevel() == 0){ - if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){ - columnStatuses.add(new BitReader(this, allocateSize, descriptor, columnChunkMetaData, - fixedLength, v, schemaElement)); - } else if (columnChunkMetaData.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){ - int length = schemaElement.type_length; - if (length <= 12) { - columnStatuses.add(new Decimal28Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement)); - } else if (length <= 16) { - columnStatuses.add(new Decimal38Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement)); - } - } else if (columnChunkMetaData.getType() == PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){ - columnStatuses.add(new FixedByteAlignedReader.DateReader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement)); - } else{ - if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { - columnStatuses.add(new ParquetFixedWidthDictionaryReader(this, allocateSize, descriptor, columnChunkMetaData, - fixedLength, v, schemaElement)); - } else { - columnStatuses.add(new FixedByteAlignedReader(this, allocateSize, descriptor, columnChunkMetaData, - fixedLength, v, schemaElement)); - } - } - return true; - } - else { // if the column is nullable - if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){ - columnStatuses.add(new NullableBitReader(this, allocateSize, descriptor, columnChunkMetaData, - fixedLength, v, schemaElement)); - } else if (columnChunkMetaData.getType() == PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){ - columnStatuses.add(new NullableFixedByteAlignedReader.NullableDateReader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement)); - } else if (columnChunkMetaData.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){ - int length = schemaElement.type_length; - if (length <= 12) { - columnStatuses.add(new NullableDecimal28Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement)); - } else if (length <= 16) { - columnStatuses.add(new NullableDecimal38Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement)); - } - } else { - columnStatuses.add(NullableFixedByteAlignedReaders.getNullableColumnReader(this, allocateSize, descriptor, - columnChunkMetaData, fixedLength, v, schemaElement)); - } - return true; - } - } - - public void readAllFixedFields(long recordsToRead, ColumnReader firstColumnStatus) throws IOException { - - for (ColumnReader crs : columnStatuses){ - crs.readAllFixedFields(recordsToRead, firstColumnStatus); - } - } - - @Override - public int next() { - resetBatch(); - long recordsToRead = 0; - try { - ColumnReader firstColumnStatus; - if (columnStatuses.size() > 0){ - firstColumnStatus = columnStatuses.iterator().next(); - } - else{ - if (varLengthReader.columns.size() > 0){ - firstColumnStatus = varLengthReader.columns.iterator().next(); - } - else{ - firstColumnStatus = varLengthReader.nullableColumns.iterator().next(); - } - } - - if (allFieldsFixedLength) { - recordsToRead = Math.min(recordsPerBatch, firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead); - } else { - recordsToRead = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH; - - // going to incorporate looking at length of values and copying the data into a single loop, hopefully it won't - // get too complicated - - //loop through variable length data to find the maximum records that will fit in this batch - // this will be a bit annoying if we want to loop though row groups, columns, pages and then individual variable - // length values... - // jacques believes that variable length fields will be encoded as |length|value|length|value|... - // cannot find more information on this right now, will keep looking - } - -// logger.debug("records to read in this pass: {}", recordsToRead); - if (allFieldsFixedLength) { - readAllFixedFields(recordsToRead, firstColumnStatus); - } else { // variable length columns - long fixedRecordsToRead = varLengthReader.readFields(recordsToRead, firstColumnStatus); - readAllFixedFields(fixedRecordsToRead, firstColumnStatus); - } - - return firstColumnStatus.valuesReadInCurrentPass; - } catch (IOException e) { - throw new DrillRuntimeException(e); - } - } - - static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, - TypeProtos.DataMode mode, SchemaElement schemaElement) { - return toMajorType(primitiveTypeName, 0, mode, schemaElement); - } - - // TODO - move this into ParquetTypeHelper and use code generation to create the list - static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length, - TypeProtos.DataMode mode, SchemaElement schemaElement) { - ConvertedType convertedType = schemaElement.getConverted_type(); - switch (mode) { - - case OPTIONAL: - switch (primitiveTypeName) { - case BINARY: - if (convertedType == null) { - return Types.optional(TypeProtos.MinorType.VARBINARY); - } - switch (convertedType) { - case UTF8: - return Types.optional(MinorType.VARCHAR); - case DECIMAL: - return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision()); - default: - throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); - } - case INT64: - if (convertedType == null) { - return Types.optional(TypeProtos.MinorType.BIGINT); - } - switch(convertedType) { - case DECIMAL: - return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision()); - case FINETIME: - throw new UnsupportedOperationException(); - case TIMESTAMP: - return Types.optional(MinorType.TIMESTAMP); - default: - throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); - } - case INT32: - if (convertedType == null) { - return Types.optional(TypeProtos.MinorType.INT); - } - switch(convertedType) { - case DECIMAL: - return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision()); - case DATE: - return Types.optional(MinorType.DATE); - case TIME: - return Types.optional(MinorType.TIME); - default: - throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); - } - case BOOLEAN: - return Types.optional(TypeProtos.MinorType.BIT); - case FLOAT: - return Types.optional(TypeProtos.MinorType.FLOAT4); - case DOUBLE: - return Types.optional(TypeProtos.MinorType.FLOAT8); - // TODO - Both of these are not supported by the parquet library yet (7/3/13), - // but they are declared here for when they are implemented - case INT96: - return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12) - .setMode(mode).build(); - case FIXED_LEN_BYTE_ARRAY: - if (convertedType == null) { - checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type."); - return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY) - .setWidth(length).setMode(mode).build(); - } else if (convertedType == ConvertedType.DECIMAL) { - return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision()); - } - default: - throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName); - } - case REQUIRED: - switch (primitiveTypeName) { - case BINARY: - if (convertedType == null) { - return Types.required(TypeProtos.MinorType.VARBINARY); - } - switch (convertedType) { - case UTF8: - return Types.required(MinorType.VARCHAR); - case DECIMAL: - return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision()); - default: - throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); - } - case INT64: - if (convertedType == null) { - return Types.required(MinorType.BIGINT); - } - switch(convertedType) { - case DECIMAL: - return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision()); - case FINETIME: - throw new UnsupportedOperationException(); - case TIMESTAMP: - return Types.required(MinorType.TIMESTAMP); - default: - throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); - } - case INT32: - if (convertedType == null) { - return Types.required(MinorType.INT); - } - switch(convertedType) { - case DECIMAL: - return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision()); - case DATE: - return Types.required(MinorType.DATE); - case TIME: - return Types.required(MinorType.TIME); - default: - throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); - } - case BOOLEAN: - return Types.required(TypeProtos.MinorType.BIT); - case FLOAT: - return Types.required(TypeProtos.MinorType.FLOAT4); - case DOUBLE: - return Types.required(TypeProtos.MinorType.FLOAT8); - // Both of these are not supported by the parquet library yet (7/3/13), - // but they are declared here for when they are implemented - case INT96: - return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12) - .setMode(mode).build(); - case FIXED_LEN_BYTE_ARRAY: - if (convertedType == null) { - checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type."); - return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY) - .setWidth(length).setMode(mode).build(); - } else if (convertedType == ConvertedType.DECIMAL) { - return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision()); - } - default: - throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName); - } - case REPEATED: - switch (primitiveTypeName) { - case BINARY: - if (convertedType == null) { - return Types.repeated(TypeProtos.MinorType.VARBINARY); - } - switch (schemaElement.getConverted_type()) { - case UTF8: - return Types.repeated(MinorType.VARCHAR); - case DECIMAL: - return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision()); - default: - throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); - } - case INT64: - if (convertedType == null) { - return Types.repeated(MinorType.BIGINT); - } - switch(convertedType) { - case DECIMAL: - return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision()); - case FINETIME: - throw new UnsupportedOperationException(); - case TIMESTAMP: - return Types.repeated(MinorType.TIMESTAMP); - default: - throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); - } - case INT32: - if (convertedType == null) { - return Types.repeated(MinorType.INT); - } - switch(convertedType) { - case DECIMAL: - return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision()); - case DATE: - return Types.repeated(MinorType.DATE); - case TIME: - return Types.repeated(MinorType.TIME); - default: - throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); - } - case BOOLEAN: - return Types.repeated(TypeProtos.MinorType.BIT); - case FLOAT: - return Types.repeated(TypeProtos.MinorType.FLOAT4); - case DOUBLE: - return Types.repeated(TypeProtos.MinorType.FLOAT8); - // Both of these are not supported by the parquet library yet (7/3/13), - // but they are declared here for when they are implemented - case INT96: - return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12) - .setMode(mode).build(); - case FIXED_LEN_BYTE_ARRAY: - if (convertedType == null) { - checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type."); - return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY) - .setWidth(length).setMode(mode).build(); - } else if (convertedType == ConvertedType.DECIMAL) { - return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision()); - } - default: - throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName); - } - } - throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName + " Mode: " + mode); - } - - private static void getReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, - SchemaElement schemaElement, List<VarLengthColumn> varLengthColumns, - List<NullableVarLengthColumn> nullableVarLengthColumns) throws ExecutionSetupException { - ConvertedType convertedType = schemaElement.getConverted_type(); - switch (descriptor.getMaxDefinitionLevel()) { - case 0: - if (convertedType == null) { - varLengthColumns.add(new VarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement)); - return; - } - switch (convertedType) { - case UTF8: - varLengthColumns.add(new VarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarCharVector) v, schemaElement)); - return; - case DECIMAL: - if (v instanceof Decimal28SparseVector) { - varLengthColumns.add(new Decimal28Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal28SparseVector) v, schemaElement)); - return; - } else if (v instanceof Decimal38SparseVector) { - varLengthColumns.add(new Decimal38Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal38SparseVector) v, schemaElement)); - return; - } - default: - } - default: - if (convertedType == null) { - nullableVarLengthColumns.add(new NullableVarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement)); - return; - } - switch (convertedType) { - case UTF8: - nullableVarLengthColumns.add(new NullableVarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarCharVector) v, schemaElement)); - return; - case DECIMAL: - if (v instanceof NullableDecimal28SparseVector) { - nullableVarLengthColumns.add(new NullableDecimal28Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal28SparseVector) v, schemaElement)); - return; - } else if (v instanceof NullableDecimal38SparseVector) { - nullableVarLengthColumns.add(new NullableDecimal38Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal38SparseVector) v, schemaElement)); - return; - } - default: - } - } - throw new UnsupportedOperationException(); - } - - private static MinorType getDecimalType(SchemaElement schemaElement) { - return schemaElement.getPrecision() <= 28 ? MinorType.DECIMAL28SPARSE : MinorType.DECIMAL38SPARSE; - } - - static String join(String delimiter, String... str) { - StringBuilder builder = new StringBuilder(); - int i = 0; - for (String s : str) { - builder.append(s); - if (i < str.length) { - builder.append(delimiter); - } - i++; - } - return builder.toString(); - } - - @Override - public void cleanup() { - for (ColumnReader column : columnStatuses) { - column.clear(); - } - columnStatuses.clear(); - - for (VarLengthColumn r : varLengthReader.columns){ - r.clear(); - } - for (NullableVarLengthColumn r : varLengthReader.nullableColumns){ - r.clear(); - } - varLengthReader.columns.clear(); - varLengthReader.nullableColumns.clear(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index b26f688..a336316 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -52,6 +52,7 @@ import static java.lang.Math.min; import static java.lang.String.format; public class ParquetRecordWriter extends ParquetOutputRecordWriter { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordWriter.class); private static final int MINIMUM_BUFFER_SIZE = 64 * 1024; private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100; @@ -147,7 +148,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record. long memSize = store.memSize(); if (memSize > blockSize) { - System.out.println("Reached block size " + blockSize); + logger.debug("Reached block size " + blockSize); flush(); newSchema(); recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index df6581f..b4f02fb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.store.parquet; import java.io.IOException; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,6 +37,7 @@ import org.apache.drill.exec.store.RecordReader; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java deleted file mode 100644 index 813a799..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.parquet; - -import java.io.IOException; -import java.util.List; - -import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableVarLengthColumn; -import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.VarLengthColumn; - -import parquet.bytes.BytesUtils; - -public class VarLenBinaryReader { - - ParquetRecordReader parentReader; - final List<VarLengthColumn> columns; - final List<NullableVarLengthColumn> nullableColumns; - - public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn> columns, - List<NullableVarLengthColumn> nullableColumns){ - this.parentReader = parentReader; - this.nullableColumns = nullableColumns; - this.columns = columns; - } - - /** - * Reads as many variable length values as possible. - * - * @param recordsToReadInThisPass - the number of records recommended for reading form the reader - * @param firstColumnStatus - a reference to the first column status in the parquet file to grab metatdata from - * @return - the number of fixed length fields that will fit in the batch - * @throws IOException - */ - public long readFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException { - - long recordsReadInCurrentPass = 0; - int lengthVarFieldsInCurrentRecord; - boolean rowGroupFinished = false; - byte[] bytes; - // write the first 0 offset - for (ColumnReader columnReader : columns) { - columnReader.bytesReadInCurrentPass = 0; - columnReader.valuesReadInCurrentPass = 0; - } - // same for the nullable columns - for (NullableVarLengthColumn columnReader : nullableColumns) { - columnReader.bytesReadInCurrentPass = 0; - columnReader.valuesReadInCurrentPass = 0; - columnReader.nullsRead = 0; - } - outer: do { - lengthVarFieldsInCurrentRecord = 0; - for (VarLengthColumn columnReader : columns) { - if (recordsReadInCurrentPass == columnReader.valueVec.getValueCapacity()){ - rowGroupFinished = true; - break; - } - if (columnReader.pageReadStatus.currentPage == null - || columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) { - columnReader.totalValuesRead += columnReader.pageReadStatus.valuesRead; - if (!columnReader.pageReadStatus.next()) { - rowGroupFinished = true; - break; - } - } - bytes = columnReader.pageReadStatus.pageDataByteArray; - - // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division - columnReader.dataTypeLengthInBits = BytesUtils.readIntLittleEndian(bytes, - (int) columnReader.pageReadStatus.readPosInBytes); - lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits; - - if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > columnReader.capacity()) { - break outer; - } - - } - for (NullableVarLengthColumn columnReader : nullableColumns) { - // check to make sure there is capacity for the next value (for nullables this is a check to see if there is - // still space in the nullability recording vector) - if (recordsReadInCurrentPass == columnReader.valueVec.getValueCapacity()){ - rowGroupFinished = true; - break; - } - if (columnReader.pageReadStatus.currentPage == null - || columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) { - if (!columnReader.pageReadStatus.next()) { - rowGroupFinished = true; - break; - } else { - columnReader.currDictVal = null; - } - } - bytes = columnReader.pageReadStatus.pageDataByteArray; - // we need to read all of the lengths to determine if this value will fit in the current vector, - // as we can only read each definition level once, we have to store the last one as we will need it - // at the start of the next read if we decide after reading all of the varlength values in this record - // that it will not fit in this batch - if ( columnReader.currDefLevel == -1 ) { - columnReader.currDefLevel = columnReader.pageReadStatus.definitionLevels.readInteger(); - } - if ( columnReader.columnDescriptor.getMaxDefinitionLevel() > columnReader.currDefLevel){ - columnReader.currentValNull = true; - columnReader.dataTypeLengthInBits = 0; - columnReader.nullsRead++; - continue;// field is null, no length to add to data vector - } - - if (columnReader.usingDictionary) { - if (columnReader.currDictVal == null) { - columnReader.currDictVal = columnReader.pageReadStatus.valueReader.readBytes(); - } - // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division - columnReader.dataTypeLengthInBits = columnReader.currDictVal.length(); - } - else { - // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division - columnReader.dataTypeLengthInBits = BytesUtils.readIntLittleEndian(bytes, - (int) columnReader.pageReadStatus.readPosInBytes); - } - lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits; - - if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > columnReader.capacity()) { - break outer; - } - } - // check that the next record will fit in the batch - if (rowGroupFinished || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + lengthVarFieldsInCurrentRecord - > parentReader.getBatchSize()){ - break outer; - } - for (VarLengthColumn columnReader : columns) { - bytes = columnReader.pageReadStatus.pageDataByteArray; - // again, I am re-purposing the unused field here, it is a length n BYTES, not bits - boolean success = columnReader.setSafe(columnReader.valuesReadInCurrentPass, bytes, - (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits); - assert success; - columnReader.pageReadStatus.readPosInBytes += columnReader.dataTypeLengthInBits + 4; - columnReader.bytesReadInCurrentPass += columnReader.dataTypeLengthInBits + 4; - columnReader.pageReadStatus.valuesRead++; - columnReader.valuesReadInCurrentPass++; - } - for (NullableVarLengthColumn columnReader : nullableColumns) { - bytes = columnReader.pageReadStatus.pageDataByteArray; - // again, I am re-purposing the unused field here, it is a length n BYTES, not bits - if (!columnReader.currentValNull && columnReader.dataTypeLengthInBits > 0){ - boolean success = columnReader.setSafe(columnReader.valuesReadInCurrentPass, bytes, - (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits); - assert success; - } - columnReader.currentValNull = false; - columnReader.currDefLevel = -1; - if (columnReader.dataTypeLengthInBits > 0){ - columnReader.pageReadStatus.readPosInBytes += columnReader.dataTypeLengthInBits + 4; - columnReader.bytesReadInCurrentPass += columnReader.dataTypeLengthInBits + 4; - } - columnReader.pageReadStatus.valuesRead++; - columnReader.valuesReadInCurrentPass++; - if ( columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) { - columnReader.totalValuesRead += columnReader.pageReadStatus.valuesRead; - columnReader.pageReadStatus.next(); - } - columnReader.currDictVal = null; - } - recordsReadInCurrentPass++; - } while (recordsReadInCurrentPass < recordsToReadInThisPass); - for (VarLengthColumn columnReader : columns) { - columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass); - } - for (NullableVarLengthColumn columnReader : nullableColumns) { - columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass); - } - return recordsReadInCurrentPass; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java deleted file mode 100644 index 56f687c..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java +++ /dev/null @@ -1,365 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ******************************************************************************/ -package org.apache.drill.exec.store.parquet; - -import java.math.BigDecimal; - -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.util.DecimalUtility; -import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; -import org.apache.drill.exec.expr.holders.Decimal38SparseHolder; -import org.apache.drill.exec.vector.Decimal28SparseVector; -import org.apache.drill.exec.vector.Decimal38SparseVector; -import org.apache.drill.exec.vector.NullableDecimal28SparseVector; -import org.apache.drill.exec.vector.NullableDecimal38SparseVector; -import org.apache.drill.exec.vector.NullableVarBinaryVector; -import org.apache.drill.exec.vector.NullableVarCharVector; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.VarBinaryVector; -import org.apache.drill.exec.vector.VarCharVector; - -import parquet.column.ColumnDescriptor; -import parquet.column.Encoding; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.io.api.Binary; - -public class VarLengthColumnReaders { - - public static abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader { - - Binary currDictVal; - - VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, - SchemaElement schemaElement) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { - usingDictionary = true; - } - else { - usingDictionary = false; - } - } - - @Override - protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { - throw new UnsupportedOperationException(); - } - - public abstract boolean setSafe(int index, byte[] bytes, int start, int length); - - public abstract int capacity(); - - } - - public static abstract class NullableVarLengthColumn<V extends ValueVector> extends ColumnReader { - - int nullsRead; - boolean currentValNull = false; - Binary currDictVal; - - NullableVarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, - SchemaElement schemaElement) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - } - - public abstract boolean setSafe(int index, byte[] value, int start, int length); - - public abstract int capacity(); - - @Override - protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { - throw new UnsupportedOperationException(); - } - } - - public static class Decimal28Column extends VarLengthColumn<Decimal28SparseVector> { - - protected Decimal28SparseVector decimal28Vector; - - Decimal28Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal28SparseVector v, - SchemaElement schemaElement) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - this.decimal28Vector = v; - } - - @Override - public boolean setSafe(int index, byte[] bytes, int start, int length) { - int width = Decimal28SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale()); - if (index >= decimal28Vector.getValueCapacity()) { - return false; - } - DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(), - schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits); - return true; - } - - @Override - public int capacity() { - return decimal28Vector.getData().capacity(); - } - } - - public static class NullableDecimal28Column extends NullableVarLengthColumn<NullableDecimal28SparseVector> { - - protected NullableDecimal28SparseVector nullableDecimal28Vector; - - NullableDecimal28Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableDecimal28SparseVector v, - SchemaElement schemaElement) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - nullableDecimal28Vector = v; - } - - @Override - public boolean setSafe(int index, byte[] bytes, int start, int length) { - int width = Decimal28SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale()); - if (index >= nullableDecimal28Vector.getValueCapacity()) { - return false; - } - DecimalUtility.getSparseFromBigDecimal(intermediate, nullableDecimal28Vector.getData(), index * width, schemaElement.getScale(), - schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits); - nullableDecimal28Vector.getMutator().setIndexDefined(index); - return true; - } - - @Override - public int capacity() { - return nullableDecimal28Vector.getData().capacity(); - } - } - - public static class Decimal38Column extends VarLengthColumn<Decimal38SparseVector> { - - protected Decimal38SparseVector decimal28Vector; - - Decimal38Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal38SparseVector v, - SchemaElement schemaElement) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - decimal28Vector = v; - } - - @Override - public boolean setSafe(int index, byte[] bytes, int start, int length) { - int width = Decimal38SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale()); - if (index >= decimal28Vector.getValueCapacity()) { - return false; - } - DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(), - schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits); - return true; - } - - @Override - public int capacity() { - return decimal28Vector.getData().capacity(); - } - } - - public static class NullableDecimal38Column extends NullableVarLengthColumn<NullableDecimal38SparseVector> { - - protected NullableDecimal38SparseVector nullableDecimal38Vector; - - NullableDecimal38Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableDecimal38SparseVector v, - SchemaElement schemaElement) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - nullableDecimal38Vector = v; - } - - @Override - public boolean setSafe(int index, byte[] bytes, int start, int length) { - int width = Decimal38SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale()); - if (index >= nullableDecimal38Vector.getValueCapacity()) { - return false; - } - DecimalUtility.getSparseFromBigDecimal(intermediate, nullableDecimal38Vector.getData(), index * width, schemaElement.getScale(), - schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits); - nullableDecimal38Vector.getMutator().setIndexDefined(index); - return true; - } - - @Override - public int capacity() { - return nullableDecimal38Vector.getData().capacity(); - } - } - - - public static class VarCharColumn extends VarLengthColumn <VarCharVector> { - - // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting - protected VarCharVector varCharVector; - - VarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarCharVector v, - SchemaElement schemaElement) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - varCharVector = v; - } - - @Override - protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean setSafe(int index, byte[] bytes, int start, int length) { - boolean success; - if(index >= varCharVector.getValueCapacity()) return false; - - if (usingDictionary) { - success = varCharVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(), - 0, currDictVal.length()); - } - else { - success = varCharVector.getMutator().setSafe(index, bytes, start, length); - } - return success; - } - - @Override - public int capacity() { - return varCharVector.getData().capacity(); - } - } - - public static class NullableVarCharColumn extends NullableVarLengthColumn <NullableVarCharVector> { - - int nullsRead; - boolean currentValNull = false; - // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting - protected NullableVarCharVector nullableVarCharVector; - - NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarCharVector v, - SchemaElement schemaElement) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - nullableVarCharVector = v; - } - - public boolean setSafe(int index, byte[] value, int start, int length) { - boolean success; - if(index >= nullableVarCharVector.getValueCapacity()) return false; - - if (usingDictionary) { - success = nullableVarCharVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(), - 0, currDictVal.length()); - } - else { - success = nullableVarCharVector.getMutator().setSafe(index, value, start, length); - } - return success; - } - - @Override - public int capacity() { - return nullableVarCharVector.getData().capacity(); - } - - @Override - protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { - throw new UnsupportedOperationException(); - } - } - - public static class VarBinaryColumn extends VarLengthColumn <VarBinaryVector> { - - // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting - protected VarBinaryVector varBinaryVector; - - VarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v, - SchemaElement schemaElement) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - varBinaryVector = v; - } - - @Override - protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean setSafe(int index, byte[] bytes, int start, int length) { - boolean success; - if(index >= varBinaryVector.getValueCapacity()) return false; - - if (usingDictionary) { - success = varBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(), - 0, currDictVal.length()); - } - else { - success = varBinaryVector.getMutator().setSafe(index, bytes, start, length); - } - return success; - } - - @Override - public int capacity() { - return varBinaryVector.getData().capacity(); - } - } - - public static class NullableVarBinaryColumn extends NullableVarLengthColumn <NullableVarBinaryVector> { - - int nullsRead; - boolean currentValNull = false; - // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting - protected org.apache.drill.exec.vector.NullableVarBinaryVector nullableVarBinaryVector; - - NullableVarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v, - SchemaElement schemaElement) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - nullableVarBinaryVector = v; - } - - public boolean setSafe(int index, byte[] value, int start, int length) { - boolean success; - if(index >= nullableVarBinaryVector.getValueCapacity()) return false; - - if (usingDictionary) { - success = nullableVarBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(), - 0, currDictVal.length()); - } - else { - success = nullableVarBinaryVector.getMutator().setSafe(index, value, start, length); - } - return success; - } - - @Override - public int capacity() { - return nullableVarBinaryVector.getData().capacity(); - } - - @Override - protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { - throw new UnsupportedOperationException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java new file mode 100644 index 0000000..2c6e488 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.vector.BaseDataValueVector; +import org.apache.drill.exec.vector.ValueVector; +import parquet.column.ColumnDescriptor; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; + +final class BitReader extends ColumnReader { + + private byte currentByte; + private byte nextByte; + private byte[] bytes; + + BitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + @Override + protected void readField(long recordsToReadInThisPass) { + + recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount() + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + + readStartInBytes = pageReader.readPosInBytes; + readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; + readLength = (int) Math.ceil(readLengthInBits / 8.0); + + bytes = pageReader.pageDataByteArray; + // standard read, using memory mapping + if (pageReader.bitShift == 0) { + ((BaseDataValueVector) valueVec).getData().writeBytes(bytes, + (int) readStartInBytes, (int) readLength); + } else { // read in individual values, because a bitshift is necessary with where the last page or batch ended + + vectorData = ((BaseDataValueVector) valueVec).getData(); + nextByte = bytes[(int) Math.max(0, Math.ceil(pageReader.valuesRead / 8.0) - 1)]; + readLengthInBits = recordsReadInThisIteration + pageReader.bitShift; + + int i = 0; + // read individual bytes with appropriate shifting + for (; i < (int) readLength; i++) { + currentByte = nextByte; + currentByte = (byte) (currentByte >>> pageReader.bitShift); + // mask the bits about to be added from the next byte + currentByte = (byte) (currentByte & ParquetRecordReader.startBitMasks[pageReader.bitShift - 1]); + // if we are not on the last byte + if ((int) Math.ceil(pageReader.valuesRead / 8.0) + i < pageReader.byteLength) { + // grab the next byte from the buffer, shift and mask it, and OR it with the leftover bits + nextByte = bytes[(int) Math.ceil(pageReader.valuesRead / 8.0) + i]; + currentByte = (byte) (currentByte | nextByte + << (8 - pageReader.bitShift) + & ParquetRecordReader.endBitMasks[8 - pageReader.bitShift - 1]); + } + vectorData.setByte(valuesReadInCurrentPass / 8 + i, currentByte); + } + vectorData.setIndex(0, (valuesReadInCurrentPass / 8) + + (int) readLength - 1); + vectorData.capacity(vectorData.writerIndex() + 1); + } + + // check if the values in this page did not end on a byte boundary, store a number of bits the next page must be + // shifted by to read all of the values into the vector without leaving space + if (readLengthInBits % 8 != 0) { + pageReader.bitShift = (int) readLengthInBits % 8; + } else { + pageReader.bitShift = 0; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java new file mode 100644 index 0000000..fd672d6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import io.netty.buffer.ByteBuf; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.vector.BaseValueVector; +import org.apache.drill.exec.vector.ValueVector; +import parquet.column.ColumnDescriptor; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.schema.PrimitiveType; +import parquet.schema.PrimitiveType.PrimitiveTypeName; + +import java.io.IOException; + +public abstract class ColumnReader<V extends ValueVector> { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnReader.class); + + final ParquetRecordReader parentReader; + + // Value Vector for this column + final V valueVec; + + ColumnDescriptor getColumnDescriptor() { + return columnDescriptor; + } + + // column description from the parquet library + final ColumnDescriptor columnDescriptor; + // metadata of the column, from the parquet library + final ColumnChunkMetaData columnChunkMetaData; + // status information on the current page + PageReader pageReader; + + final SchemaElement schemaElement; + boolean usingDictionary; + + // quick reference to see if the field is fixed length (as this requires an instanceof) + final boolean isFixedLength; + + // counter for the total number of values read from one or more pages + // when a batch is filled all of these values should be the same for all of the columns + int totalValuesRead; + + // counter for the values that have been read in this pass (a single call to the next() method) + int valuesReadInCurrentPass; + + // length of single data value in bits, if the length is fixed + int dataTypeLengthInBits; + int bytesReadInCurrentPass; + + protected ByteBuf vectorData; + // when reading definition levels for nullable columns, it is a one-way stream of integers + // when reading var length data, where we don't know if all of the records will fit until we've read all of them + // we must store the last definition level an use it in at the start of the next batch + int currDefLevel; + + // variables for a single read pass + long readStartInBytes = 0, readLength = 0, readLengthInBits = 0, recordsReadInThisIteration = 0; + + protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException { + this.parentReader = parentReader; + this.columnDescriptor = descriptor; + this.columnChunkMetaData = columnChunkMetaData; + this.isFixedLength = fixedLength; + this.schemaElement = schemaElement; + this.valueVec = v; + this.pageReader = new PageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(), columnChunkMetaData); + + if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { + if (columnDescriptor.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { + dataTypeLengthInBits = columnDescriptor.getTypeLength() * 8; + } else { + dataTypeLengthInBits = ParquetRecordReader.getTypeLengthInBits(columnDescriptor.getType()); + } + } + + } + + public int getRecordsReadInCurrentPass() { + return valuesReadInCurrentPass; + } + + public void processPages(long recordsToReadInThisPass) throws IOException { + reset(); + do { + determineSize(recordsToReadInThisPass, 0); + + } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.currentPage != null); + valueVec.getMutator().setValueCount(valuesReadInCurrentPass); + } + + public void clear() { + valueVec.clear(); + this.pageReader.clear(); + } + + public void readValues(long recordsToRead) { + readField(recordsToRead); + + valuesReadInCurrentPass += recordsReadInThisIteration; + totalValuesRead += recordsReadInThisIteration; + pageReader.valuesRead += recordsReadInThisIteration; + pageReader.readPosInBytes = readStartInBytes + readLength; + } + + protected abstract void readField(long recordsToRead); + + /** + * Determines the size of a single value in a variable column. + * + * Return value indicates if we have finished a row group and should stop reading + * + * @param recordsReadInCurrentPass + * @param lengthVarFieldsInCurrentRecord + * @return - true if we should stop reading + * @throws IOException + */ + public boolean determineSize(long recordsReadInCurrentPass, Integer lengthVarFieldsInCurrentRecord) throws IOException { + + boolean doneReading = readPage(); + if (doneReading) + return true; + + doneReading = processPageData((int) recordsReadInCurrentPass); + if (doneReading) + return true; + + lengthVarFieldsInCurrentRecord += dataTypeLengthInBits; + + doneReading = checkVectorCapacityReached(); + if (doneReading) + return true; + + return false; + } + + protected void readRecords(int recordsToRead) { + for (int i = 0; i < recordsToRead; i++) { + readField(i); + } + pageReader.valuesRead += recordsToRead; + } + + protected boolean processPageData(int recordsToReadInThisPass) throws IOException { + readValues(recordsToReadInThisPass); + return true; + } + + public void updatePosition() {} + + public void updateReadyToReadPosition() {} + + public void reset() { + readStartInBytes = 0; + readLength = 0; + readLengthInBits = 0; + recordsReadInThisIteration = 0; + bytesReadInCurrentPass = 0; + vectorData = ((BaseValueVector) valueVec).getData(); + } + + public int capacity() { + return (int) (valueVec.getValueCapacity() * dataTypeLengthInBits / 8.0); + } + + // Read a page if we need more data, returns true if we need to exit the read loop + public boolean readPage() throws IOException { + if (pageReader.currentPage == null + || totalValuesReadAndReadyToReadInPage() == pageReader.currentPage.getValueCount()) { + readRecords(pageReader.valuesReadyToRead); + if (pageReader.currentPage != null) + totalValuesRead += pageReader.currentPage.getValueCount(); + if (!pageReader.next()) { + hitRowGroupEnd(); + return true; + } + postPageRead(); + } + return false; + } + + protected int totalValuesReadAndReadyToReadInPage() { + return pageReader.valuesRead + pageReader.valuesReadyToRead; + } + + protected void postPageRead() { + pageReader.valuesReadyToRead = 0; + } + + protected void hitRowGroupEnd() {} + + protected boolean checkVectorCapacityReached() { + if (bytesReadInCurrentPass + dataTypeLengthInBits > capacity()) { + logger.debug("Reached the capacity of the data vector in a variable length value vector."); + return true; + } + else if (valuesReadInCurrentPass > valueVec.getValueCapacity()){ + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java new file mode 100644 index 0000000..243744e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java @@ -0,0 +1,175 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.parquet.columnreaders; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.exception.SchemaChangeException; + +import org.apache.drill.exec.vector.Decimal28SparseVector; +import org.apache.drill.exec.vector.Decimal38SparseVector; +import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.NullableDecimal28SparseVector; +import org.apache.drill.exec.vector.NullableDecimal38SparseVector; +import org.apache.drill.exec.vector.NullableFloat4Vector; +import org.apache.drill.exec.vector.NullableFloat8Vector; +import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.NullableVarBinaryVector; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VarBinaryVector; +import org.apache.drill.exec.vector.VarCharVector; +import parquet.column.ColumnDescriptor; +import parquet.column.Encoding; +import parquet.format.ConvertedType; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.schema.PrimitiveType; + +public class ColumnReaderFactory { + + /** + * @param fixedLength + * @param descriptor + * @param columnChunkMetaData + * @param allocateSize - the size of the vector to create + * @return + * @throws SchemaChangeException + */ + static ColumnReader createFixedColumnReader(ParquetRecordReader recordReader, boolean fixedLength, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v, + SchemaElement schemaElement) + throws Exception { + ConvertedType convertedType = schemaElement.getConverted_type(); + // if the column is required, or repeated (in which case we just want to use this to generate our appropriate + // ColumnReader for actually transferring data into the data vector inside of our repeated vector + if (descriptor.getMaxDefinitionLevel() == 0 || descriptor.getMaxRepetitionLevel() > 0){ + if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){ + return new BitReader(recordReader, allocateSize, descriptor, columnChunkMetaData, + fixedLength, v, schemaElement); + } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){ + int length = schemaElement.type_length; + if (length <= 12) { + return new FixedByteAlignedReader.Decimal28Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } else if (length <= 16) { + return new FixedByteAlignedReader.Decimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){ + return new FixedByteAlignedReader.DateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } else{ + if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { + return new ParquetFixedWidthDictionaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, + fixedLength, v, schemaElement); + } else { + return new FixedByteAlignedReader(recordReader, allocateSize, descriptor, columnChunkMetaData, + fixedLength, v, schemaElement); + } + } + } + else { // if the column is nullable + if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){ + return new NullableBitReader(recordReader, allocateSize, descriptor, columnChunkMetaData, + fixedLength, v, schemaElement); + } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){ + return new NullableFixedByteAlignedReaders.NullableDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){ + int length = schemaElement.type_length; + if (length <= 12) { + return new NullableFixedByteAlignedReaders.NullableDecimal28Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } else if (length <= 16) { + return new NullableFixedByteAlignedReaders.NullableDecimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + } else { + return getNullableColumnReader(recordReader, allocateSize, descriptor, + columnChunkMetaData, fixedLength, v, schemaElement); + } + } + throw new Exception("Unexpected parquet metadata configuration."); + } + + static VarLengthValuesColumn getReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, + SchemaElement schemaElement + ) throws ExecutionSetupException { + ConvertedType convertedType = schemaElement.getConverted_type(); + switch (descriptor.getMaxDefinitionLevel()) { + case 0: + if (convertedType == null) { + return new VarLengthColumnReaders.VarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement); + } + switch (convertedType) { + case UTF8: + return new VarLengthColumnReaders.VarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarCharVector) v, schemaElement); + case DECIMAL: + if (v instanceof Decimal28SparseVector) { + return new VarLengthColumnReaders.Decimal28Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal28SparseVector) v, schemaElement); + } else if (v instanceof Decimal38SparseVector) { + return new VarLengthColumnReaders.Decimal38Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal38SparseVector) v, schemaElement); + } + default: + } + default: + if (convertedType == null) { + return new VarLengthColumnReaders.NullableVarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement); + } + switch (convertedType) { + case UTF8: + return new VarLengthColumnReaders.NullableVarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarCharVector) v, schemaElement); + case DECIMAL: + if (v instanceof NullableDecimal28SparseVector) { + return new VarLengthColumnReaders.NullableDecimal28Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal28SparseVector) v, schemaElement); + } else if (v instanceof NullableDecimal38SparseVector) { + return new VarLengthColumnReaders.NullableDecimal38Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal38SparseVector) v, schemaElement); + } + default: + } + } + throw new UnsupportedOperationException(); + } + + public static NullableColumnReader getNullableColumnReader(ParquetRecordReader parentReader, int allocateSize, + ColumnDescriptor columnDescriptor, + ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, + ValueVector valueVec, + SchemaElement schemaElement) throws ExecutionSetupException { + if (! columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { + return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, + fixedLength, valueVec, schemaElement); + } else { + if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64) { + return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, + fixedLength, (NullableBigIntVector)valueVec, schemaElement); + } + else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32) { + return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, + fixedLength, (NullableIntVector)valueVec, schemaElement); + } + else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT) { + return new NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, + fixedLength, (NullableFloat4Vector)valueVec, schemaElement); + } + else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE) { + return new NullableFixedByteAlignedReaders.NullableDictionaryFloat8Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, + fixedLength, (NullableFloat8Vector)valueVec, schemaElement); + } + else{ + throw new ExecutionSetupException("Unsupported nullable column type " + columnDescriptor.getType().name() ); + } + } + } +}
