http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java new file mode 100644 index 0000000..7eeeeaa --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java @@ -0,0 +1,236 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.parquet.columnreaders; + +import org.apache.drill.common.types.TypeProtos; +import static org.apache.drill.common.types.TypeProtos.MinorType; +import static org.apache.drill.common.types.TypeProtos.DataMode; +import static parquet.Preconditions.checkArgument; + +import org.apache.drill.common.types.Types; +import parquet.format.ConvertedType; +import parquet.format.SchemaElement; +import parquet.schema.PrimitiveType; + +public class ParquetToDrillTypeConverter { + + private static TypeProtos.MinorType getDecimalType(SchemaElement schemaElement) { + return schemaElement.getPrecision() <= 28 ? TypeProtos.MinorType.DECIMAL28SPARSE : MinorType.DECIMAL38SPARSE; + } + + public static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length, + TypeProtos.DataMode mode, SchemaElement schemaElement) { + ConvertedType convertedType = schemaElement.getConverted_type(); + switch (mode) { + + case OPTIONAL: + switch (primitiveTypeName) { + case BINARY: + if (convertedType == null) { + return Types.optional(TypeProtos.MinorType.VARBINARY); + } + switch (convertedType) { + case UTF8: + return Types.optional(TypeProtos.MinorType.VARCHAR); + case DECIMAL: + return Types.withScaleAndPrecision(getDecimalType(schemaElement), TypeProtos.DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision()); + default: + throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); + } + case INT64: + if (convertedType == null) { + return Types.optional(TypeProtos.MinorType.BIGINT); + } + switch(convertedType) { + case DECIMAL: + return Types.withScaleAndPrecision(TypeProtos.MinorType.DECIMAL18, DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision()); + case FINETIME: + throw new UnsupportedOperationException(); + case TIMESTAMP: + return Types.optional(MinorType.TIMESTAMP); + default: + throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); + } + case INT32: + if (convertedType == null) { + return Types.optional(TypeProtos.MinorType.INT); + } + switch(convertedType) { + case DECIMAL: + return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision()); + case DATE: + return Types.optional(MinorType.DATE); + case TIME: + return Types.optional(MinorType.TIME); + default: + throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); + } + case BOOLEAN: + return Types.optional(TypeProtos.MinorType.BIT); + case FLOAT: + return Types.optional(TypeProtos.MinorType.FLOAT4); + case DOUBLE: + return Types.optional(TypeProtos.MinorType.FLOAT8); + // TODO - Both of these are not supported by the parquet library yet (7/3/13), + // but they are declared here for when they are implemented + case INT96: + return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12) + .setMode(mode).build(); + case FIXED_LEN_BYTE_ARRAY: + if (convertedType == null) { + checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type."); + return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY) + .setWidth(length).setMode(mode).build(); + } else if (convertedType == ConvertedType.DECIMAL) { + return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision()); + } + default: + throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName); + } + case REQUIRED: + switch (primitiveTypeName) { + case BINARY: + if (convertedType == null) { + return Types.required(TypeProtos.MinorType.VARBINARY); + } + switch (convertedType) { + case UTF8: + return Types.required(MinorType.VARCHAR); + case DECIMAL: + return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision()); + default: + throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); + } + case INT64: + if (convertedType == null) { + return Types.required(MinorType.BIGINT); + } + switch(convertedType) { + case DECIMAL: + return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision()); + case FINETIME: + throw new UnsupportedOperationException(); + case TIMESTAMP: + return Types.required(MinorType.TIMESTAMP); + default: + throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); + } + case INT32: + if (convertedType == null) { + return Types.required(MinorType.INT); + } + switch(convertedType) { + case DECIMAL: + return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision()); + case DATE: + return Types.required(MinorType.DATE); + case TIME: + return Types.required(MinorType.TIME); + default: + throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); + } + case BOOLEAN: + return Types.required(TypeProtos.MinorType.BIT); + case FLOAT: + return Types.required(TypeProtos.MinorType.FLOAT4); + case DOUBLE: + return Types.required(TypeProtos.MinorType.FLOAT8); + // Both of these are not supported by the parquet library yet (7/3/13), + // but they are declared here for when they are implemented + case INT96: + return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12) + .setMode(mode).build(); + case FIXED_LEN_BYTE_ARRAY: + if (convertedType == null) { + checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type."); + return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY) + .setWidth(length).setMode(mode).build(); + } else if (convertedType == ConvertedType.DECIMAL) { + return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision()); + } + default: + throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName); + } + case REPEATED: + switch (primitiveTypeName) { + case BINARY: + if (convertedType == null) { + return Types.repeated(TypeProtos.MinorType.VARBINARY); + } + switch (schemaElement.getConverted_type()) { + case UTF8: + return Types.repeated(MinorType.VARCHAR); + case DECIMAL: + return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision()); + default: + throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); + } + case INT64: + if (convertedType == null) { + return Types.repeated(MinorType.BIGINT); + } + switch(convertedType) { + case DECIMAL: + return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision()); + case FINETIME: + throw new UnsupportedOperationException(); + case TIMESTAMP: + return Types.repeated(MinorType.TIMESTAMP); + default: + throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); + } + case INT32: + if (convertedType == null) { + return Types.repeated(MinorType.INT); + } + switch(convertedType) { + case DECIMAL: + return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision()); + case DATE: + return Types.repeated(MinorType.DATE); + case TIME: + return Types.repeated(MinorType.TIME); + default: + throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType)); + } + case BOOLEAN: + return Types.repeated(TypeProtos.MinorType.BIT); + case FLOAT: + return Types.repeated(TypeProtos.MinorType.FLOAT4); + case DOUBLE: + return Types.repeated(TypeProtos.MinorType.FLOAT8); + // Both of these are not supported by the parquet library yet (7/3/13), + // but they are declared here for when they are implemented + case INT96: + return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12) + .setMode(mode).build(); + case FIXED_LEN_BYTE_ARRAY: + if (convertedType == null) { + checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type."); + return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY) + .setWidth(length).setMode(mode).build(); + } else if (convertedType == ConvertedType.DECIMAL) { + return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision()); + } + default: + throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName); + } + } + throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName + " Mode: " + mode); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java new file mode 100644 index 0000000..409f17d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import java.io.IOException; +import java.util.List; + +public class VarLenBinaryReader { + + ParquetRecordReader parentReader; + final List<VarLengthColumn> columns; + + public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn> columns){ + this.parentReader = parentReader; + this.columns = columns; + } + + /** + * Reads as many variable length values as possible. + * + * @param recordsToReadInThisPass - the number of records recommended for reading form the reader + * @param firstColumnStatus - a reference to the first column status in the parquet file to grab metatdata from + * @return - the number of fixed length fields that will fit in the batch + * @throws IOException + */ + public long readFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException { + + long recordsReadInCurrentPass = 0; + int lengthVarFieldsInCurrentRecord; + long totalVariableLengthData = 0; + boolean exitLengthDeterminingLoop = false; + // write the first 0 offset + for (VarLengthColumn columnReader : columns) { + columnReader.reset(); + } + + do { + lengthVarFieldsInCurrentRecord = 0; + for (VarLengthColumn columnReader : columns) { + if ( ! exitLengthDeterminingLoop ) + exitLengthDeterminingLoop = columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); + else + break; + } + // check that the next record will fit in the batch + if (exitLengthDeterminingLoop || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + totalVariableLengthData + + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()){ + break; + } + for (VarLengthColumn columnReader : columns ) { + columnReader.updateReadyToReadPosition(); + columnReader.currDefLevel = -1; + } + recordsReadInCurrentPass++; + totalVariableLengthData += lengthVarFieldsInCurrentRecord; + } while (recordsReadInCurrentPass < recordsToReadInThisPass); + + for (VarLengthColumn columnReader : columns) { + columnReader.readRecords(columnReader.pageReader.valuesReadyToRead); + } + for (VarLengthColumn columnReader : columns) { + columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass); + } + return recordsReadInCurrentPass; + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java new file mode 100644 index 0000000..14ee631 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java @@ -0,0 +1,60 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.parquet.columnreaders; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.vector.ValueVector; +import parquet.column.ColumnDescriptor; +import parquet.format.Encoding; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.io.api.Binary; + +import java.io.IOException; + +public abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLengthColumn.class); + + Binary currDictVal; + + VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { + usingDictionary = true; + } + else { + usingDictionary = false; + } + } + + protected boolean processPageData(int recordsToReadInThisPass) throws IOException { + return readAndStoreValueSizeInformation(); + } + + public void reset() { + super.reset(); + pageReader.valuesReadyToRead = 0; + } + + protected abstract boolean readAndStoreValueSizeInformation() throws IOException; + + public abstract boolean skipReadyToReadPositionUpdate(); + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java new file mode 100644 index 0000000..979e8c3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java @@ -0,0 +1,294 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.parquet.columnreaders; + +import java.math.BigDecimal; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.util.DecimalUtility; +import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; +import org.apache.drill.exec.expr.holders.Decimal38SparseHolder; +import org.apache.drill.exec.vector.Decimal28SparseVector; +import org.apache.drill.exec.vector.Decimal38SparseVector; +import org.apache.drill.exec.vector.NullableDecimal28SparseVector; +import org.apache.drill.exec.vector.NullableDecimal38SparseVector; +import org.apache.drill.exec.vector.NullableVarBinaryVector; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.VarBinaryVector; +import org.apache.drill.exec.vector.VarCharVector; +import parquet.column.ColumnDescriptor; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; + +public class VarLengthColumnReaders { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLengthColumnReaders.class); + + public static class Decimal28Column extends VarLengthValuesColumn<Decimal28SparseVector> { + + protected Decimal28SparseVector decimal28Vector; + + Decimal28Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal28SparseVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + this.decimal28Vector = v; + } + + @Override + public boolean setSafe(int index, byte[] bytes, int start, int length) { + int width = Decimal28SparseHolder.WIDTH; + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale()); + if (index >= decimal28Vector.getValueCapacity()) { + return false; + } + DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(), + schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits); + return true; + } + + @Override + public int capacity() { + return decimal28Vector.getData().capacity(); + } + } + + public static class NullableDecimal28Column extends NullableVarLengthValuesColumn<NullableDecimal28SparseVector> { + + protected NullableDecimal28SparseVector nullableDecimal28Vector; + + NullableDecimal28Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableDecimal28SparseVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + nullableDecimal28Vector = v; + } + + @Override + public boolean setSafe(int index, byte[] bytes, int start, int length) { + int width = Decimal28SparseHolder.WIDTH; + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale()); + if (index >= nullableDecimal28Vector.getValueCapacity()) { + return false; + } + DecimalUtility.getSparseFromBigDecimal(intermediate, nullableDecimal28Vector.getData(), index * width, schemaElement.getScale(), + schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits); + nullableDecimal28Vector.getMutator().setIndexDefined(index); + return true; + } + + @Override + public int capacity() { + return nullableDecimal28Vector.getData().capacity(); + } + } + + public static class Decimal38Column extends VarLengthValuesColumn<Decimal38SparseVector> { + + protected Decimal38SparseVector decimal28Vector; + + Decimal38Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal38SparseVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + decimal28Vector = v; + } + + @Override + public boolean setSafe(int index, byte[] bytes, int start, int length) { + int width = Decimal38SparseHolder.WIDTH; + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale()); + if (index >= decimal28Vector.getValueCapacity()) { + return false; + } + DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(), + schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits); + return true; + } + + @Override + public int capacity() { + return decimal28Vector.getData().capacity(); + } + } + + public static class NullableDecimal38Column extends NullableVarLengthValuesColumn<NullableDecimal38SparseVector> { + + protected NullableDecimal38SparseVector nullableDecimal38Vector; + + NullableDecimal38Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableDecimal38SparseVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + nullableDecimal38Vector = v; + } + + @Override + public boolean setSafe(int index, byte[] bytes, int start, int length) { + int width = Decimal38SparseHolder.WIDTH; + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale()); + if (index >= nullableDecimal38Vector.getValueCapacity()) { + return false; + } + DecimalUtility.getSparseFromBigDecimal(intermediate, nullableDecimal38Vector.getData(), index * width, schemaElement.getScale(), + schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits); + nullableDecimal38Vector.getMutator().setIndexDefined(index); + return true; + } + + @Override + public int capacity() { + return nullableDecimal38Vector.getData().capacity(); + } + } + + + public static class VarCharColumn extends VarLengthValuesColumn<VarCharVector> { + + // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting + protected VarCharVector varCharVector; + + VarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarCharVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + varCharVector = v; + } + + @Override + public boolean setSafe(int index, byte[] bytes, int start, int length) { + boolean success; + if(index >= varCharVector.getValueCapacity()) return false; + + if (usingDictionary) { + success = varCharVector.getMutator().setSafe(index, currDictValToWrite.getBytes(), + 0, currDictValToWrite.length()); + } + else { + success = varCharVector.getMutator().setSafe(index, bytes, start, length); + } + return success; + } + + @Override + public int capacity() { + return varCharVector.getData().capacity(); + } + } + + public static class NullableVarCharColumn extends NullableVarLengthValuesColumn<NullableVarCharVector> { + + int nullsRead; + boolean currentValNull = false; + // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting + protected NullableVarCharVector nullableVarCharVector; + + NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarCharVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + nullableVarCharVector = v; + } + + public boolean setSafe(int index, byte[] value, int start, int length) { + boolean success; + if(index >= nullableVarCharVector.getValueCapacity()) return false; + + if (usingDictionary) { + success = nullableVarCharVector.getMutator().setSafe(index, currDictValToWrite.getBytes(), + 0, currDictValToWrite.length()); + } + else { + success = nullableVarCharVector.getMutator().setSafe(index, value, start, length); + } + return success; + } + + @Override + public int capacity() { + return nullableVarCharVector.getData().capacity(); + } + } + + public static class VarBinaryColumn extends VarLengthValuesColumn<VarBinaryVector> { + + // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting + protected VarBinaryVector varBinaryVector; + + VarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + varBinaryVector = v; + } + + @Override + public boolean setSafe(int index, byte[] bytes, int start, int length) { + boolean success; + if(index >= varBinaryVector.getValueCapacity()) return false; + + if (usingDictionary) { + success = varBinaryVector.getMutator().setSafe(index, currDictValToWrite.getBytes(), + 0, currDictValToWrite.length()); + } + else { + success = varBinaryVector.getMutator().setSafe(index, bytes, start, length); + } + return success; + } + + @Override + public int capacity() { + return varBinaryVector.getData().capacity(); + } + } + + public static class NullableVarBinaryColumn extends NullableVarLengthValuesColumn<NullableVarBinaryVector> { + + int nullsRead; + boolean currentValNull = false; + // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting + protected org.apache.drill.exec.vector.NullableVarBinaryVector nullableVarBinaryVector; + + NullableVarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + nullableVarBinaryVector = v; + } + + public boolean setSafe(int index, byte[] value, int start, int length) { + boolean success; + if(index >= nullableVarBinaryVector.getValueCapacity()) return false; + + if (usingDictionary) { + success = nullableVarBinaryVector.getMutator().setSafe(index, currDictValToWrite.getBytes(), + 0, currDictValToWrite.length()); + } + else { + success = nullableVarBinaryVector.getMutator().setSafe(index, value, start, length); + } + return success; + } + + @Override + public int capacity() { + return nullableVarBinaryVector.getData().capacity(); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java new file mode 100644 index 0000000..092c186 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java @@ -0,0 +1,96 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.parquet.columnreaders; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VariableWidthVector; +import parquet.bytes.BytesUtils; +import parquet.column.ColumnDescriptor; +import parquet.format.Encoding; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.io.api.Binary; + +import java.io.IOException; + +public abstract class VarLengthValuesColumn<V extends ValueVector> extends VarLengthColumn { + + Binary currLengthDeterminingDictVal; + Binary currDictValToWrite; + VariableWidthVector variableWidthVector; + + VarLengthValuesColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + variableWidthVector = (VariableWidthVector) valueVec; + if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { + usingDictionary = true; + } + else { + usingDictionary = false; + } + } + + public abstract boolean setSafe(int index, byte[] bytes, int start, int length); + + @Override + protected void readField(long recordToRead) { + dataTypeLengthInBits = variableWidthVector.getAccessor().getValueLength(valuesReadInCurrentPass); + // again, I am re-purposing the unused field here, it is a length n BYTES, not bits + boolean success = setSafe((int) valuesReadInCurrentPass, pageReader.pageDataByteArray, + (int) pageReader.readPosInBytes + 4, dataTypeLengthInBits); + assert success; + updatePosition(); + } + + public void updateReadyToReadPosition() { + pageReader.readyToReadPosInBytes += dataTypeLengthInBits + 4; + pageReader.valuesReadyToRead++; + currLengthDeterminingDictVal = null; + } + + public void updatePosition() { + pageReader.readPosInBytes += dataTypeLengthInBits + 4; + bytesReadInCurrentPass += dataTypeLengthInBits; + valuesReadInCurrentPass++; + } + + public boolean skipReadyToReadPositionUpdate() { + return false; + } + + protected boolean readAndStoreValueSizeInformation() throws IOException { + // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division + try { + dataTypeLengthInBits = BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray, + (int) pageReader.readyToReadPosInBytes); + } catch (Throwable t) { + throw t; + } + + // this should not fail + if (!variableWidthVector.getMutator().setValueLengthSafe((int) valuesReadInCurrentPass + pageReader.valuesReadyToRead, + dataTypeLengthInBits)) { + return true; + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java index 9b0a6cd..6d03541 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java @@ -36,4 +36,17 @@ public interface RepeatedFixedWidthVector extends ValueVector{ * @return The number of bytes of the buffer that were consumed. */ public int load(int parentValueCount, int childValueCount, ByteBuf buf); + + public abstract RepeatedAccessor getAccessor(); + + public abstract RepeatedMutator getMutator(); + + public interface RepeatedAccessor extends Accessor { + public int getGroupCount(); + } + public interface RepeatedMutator extends Mutator { + public void setValueCounts(int parentValueCount, int childValueCount); + public boolean setRepetitionAtIndexSafe(int index, int repetitionCount); + public BaseDataValueVector getDataVector(); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java index bd03038..a2c884e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java @@ -35,6 +35,8 @@ public interface RepeatedVariableWidthVector extends ValueVector{ */ public int getByteCapacity(); + public abstract RepeatedFixedWidthVector.RepeatedAccessor getAccessor(); + /** * Load the records in the provided buffer based on the given number of values. * @param dataBytes The number of bytes associated with the data array. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java index 2b07750..6660351 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java @@ -44,5 +44,15 @@ public interface VariableWidthVector extends ValueVector{ */ public int load(int dataBytes, int valueCount, ByteBuf buf); - public abstract Mutator getMutator(); + public abstract VariableWidthMutator getMutator(); + + public abstract VariableWidthAccessor getAccessor(); + + public interface VariableWidthAccessor extends Accessor { + public int getValueLength(int index); + } + + public interface VariableWidthMutator extends Mutator { + public boolean setValueLengthSafe(int index, int length); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java index ef8aef8..d43bf59 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java @@ -38,6 +38,7 @@ import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.util.JsonStringArrayList; +import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.RepeatedFixedWidthVector; import org.apache.drill.exec.vector.UInt4Vector; import org.apache.drill.exec.vector.ValueVector; @@ -54,7 +55,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea private final UInt4Vector offsets; // offsets to start of each record private final BufferAllocator allocator; private final Mutator mutator = new Mutator(); - private final Accessor accessor = new Accessor(); + private final RepeatedListAccessor accessor = new RepeatedListAccessor(); private ValueVector vector; private final MaterializedField field; private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this); @@ -112,7 +113,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea } - public class Mutator implements ValueVector.Mutator{ + public class Mutator implements ValueVector.Mutator, RepeatedMutator{ public void startNewGroup(int index) { offsets.getMutator().set(index+1, offsets.getAccessor().get(index)); @@ -151,9 +152,24 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea public void generateTestData(int values) { } + @Override + public void setValueCounts(int parentValueCount, int childValueCount) { + // TODO - determine if this should be implemented for this class + throw new UnsupportedOperationException(); + } + + @Override + public boolean setRepetitionAtIndexSafe(int index, int repetitionCount) { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public BaseDataValueVector getDataVector() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } } - public class Accessor implements ValueVector.Accessor { + public class RepeatedListAccessor implements RepeatedAccessor{ @Override public Object getObject(int index) { @@ -211,6 +227,10 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea return reader; } + @Override + public int getGroupCount() { + return size(); + } } @Override @@ -315,7 +335,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea } @Override - public Accessor getAccessor() { + public RepeatedListAccessor getAccessor() { return accessor; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java index f05ab1b..30f5fc7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java @@ -37,6 +37,7 @@ import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.util.JsonStringArrayList; +import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.RepeatedFixedWidthVector; import org.apache.drill.exec.vector.UInt4Vector; import org.apache.drill.exec.vector.ValueVector; @@ -50,6 +51,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; public class RepeatedMapVector extends AbstractContainerVector implements RepeatedFixedWidthVector { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatedMapVector.class); public final static MajorType TYPE = MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(DataMode.REPEATED).build(); @@ -59,7 +61,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat private final Map<String, VectorWithOrdinal> vectorIds = Maps.newHashMap(); private final RepeatedMapReaderImpl reader = new RepeatedMapReaderImpl(RepeatedMapVector.this); private final IntObjectOpenHashMap<ValueVector> vectorsById = new IntObjectOpenHashMap<>(); - private final Accessor accessor = new Accessor(); + private final RepeatedMapAccessor accessor = new RepeatedMapAccessor(); private final Mutator mutator = new Mutator(); private final BufferAllocator allocator; private final MaterializedField field; @@ -278,7 +280,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat } @Override - public Accessor getAccessor() { + public RepeatedMapAccessor getAccessor() { return accessor; } @@ -349,7 +351,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat return mutator; } - public class Accessor implements ValueVector.Accessor{ + public class RepeatedMapAccessor implements RepeatedAccessor { @Override public Object getObject(int index) { @@ -414,6 +416,10 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat return reader; } + @Override + public int getGroupCount() { + return size(); + } } private void populateEmpties(int groupCount){ @@ -424,7 +430,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat lastSet = groupCount - 1; } - public class Mutator implements ValueVector.Mutator{ + public class Mutator implements ValueVector.Mutator, RepeatedMutator { public void startNewGroup(int index) { populateEmpties(index); @@ -458,6 +464,21 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat public void generateTestData(int values) { } + @Override + public void setValueCounts(int parentValueCount, int childValueCount) { + // TODO - determine if this should be implemented for this class + throw new UnsupportedOperationException(); + } + + @Override + public boolean setRepetitionAtIndexSafe(int index, int repetitionCount) { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public BaseDataValueVector getDataVector() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index aa2b66f..1cb0d06 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.drill.BaseTestQuery; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorWrapper; @@ -39,14 +40,13 @@ import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -@Ignore +import java.io.UnsupportedEncodingException; + public class TestParquetWriter extends BaseTestQuery { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestParquetWriter.class); static FileSystem fs; - private static final String EMPLOYEE_PARQUET_PATH = "employee_parquet"; - @BeforeClass public static void initFs() throws Exception { Configuration conf = new Configuration(); @@ -59,7 +59,7 @@ public class TestParquetWriter extends BaseTestQuery { public void testSimple() throws Exception { String selection = "*"; String inputTable = "cp.`employee.json`"; - runTestAndValidate(selection, selection, inputTable, EMPLOYEE_PARQUET_PATH); + runTestAndValidate(selection, selection, inputTable, "employee_parquet"); } @Test @@ -90,24 +90,13 @@ public class TestParquetWriter extends BaseTestQuery { @Test public void testTPCHReadWrite1_date_convertedType() throws Exception { String selection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " + - "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, cast(L_COMMITDATE as DATE) as COMMITDATE, cast(L_RECEIPTDATE as DATE) AS RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT"; + "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, cast(L_COMMITDATE as DATE) as L_COMMITDATE, cast(L_RECEIPTDATE as DATE) AS L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT"; String validationSelection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " + - "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE,COMMITDATE ,RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT"; + "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE,L_COMMITDATE ,L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT"; String inputTable = "cp.`tpch/lineitem.parquet`"; runTestAndValidate(selection, validationSelection, inputTable, "lineitem_parquet"); } - // TODO file a JIRA for running this query with the projected column names the same as the originals, it failed with a deadbuf - // on the client, it appeared that the projection was sending batches out with a record count but a deadbuf - /* - String selection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " + - "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, cast(L_COMMITDATE as DATE) as L_COMMITDATE, cast(L_RECEIPTDATE as DATE) AS L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT"; - String validationSelection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " + - "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE,COMMITDATE ,RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT"; - */ - // this is rather odd, I can select the data out fo parquet and project it to cast the date fields - // this stores all of the data correctly, but when I got to read it out again with the query that created it (with redudant casts I beleive) it has - // everything but the cast date columns as nulls @Test public void testTPCHReadWrite2() throws Exception { String inputTable = "cp.`tpch/customer.parquet`"; @@ -144,34 +133,65 @@ public class TestParquetWriter extends BaseTestQuery { runTestAndValidate("*", "*", inputTable, "region_parquet"); } - // This test fails an asset in OperatorStats intermittently @Test public void testTPCHReadWrite8() throws Exception { String inputTable = "cp.`tpch/supplier.parquet`"; runTestAndValidate("*", "*", inputTable, "supplier_parquet"); } + // working to create an exhaustive test of the format for this one. including all convertedTypes + // will not be supporting interval for Beta as of current schedule + // Types left out: + // "TIMESTAMPTZ_col" + @Test + public void testRepeated() throws Exception { + String inputTable = "cp.`parquet/basic_repeated.json`"; + runTestAndValidate("*", "*", inputTable, "basic_repeated"); + } + + // TODO - this is failing due to the parquet behavior of allowing repeated values to reach across + // pages. This broke our reading model a bit, but it is possible to work around. + @Test + public void testRepeatedDouble() throws Exception { + String inputTable = "cp.`parquet/repeated_double_data.json`"; + runTestAndValidate("*", "*", inputTable, "repeated_double_parquet"); + } + + @Test + public void testRepeatedLong() throws Exception { + String inputTable = "cp.`parquet/repeated_integer_data.json`"; + runTestAndValidate("*", "*", inputTable, "repeated_int_parquet"); + } + + @Test + public void testRepeatedBool() throws Exception { + String inputTable = "cp.`parquet/repeated_bool_data.json`"; + runTestAndValidate("*", "*", inputTable, "repeated_bool_parquet"); + } + + @Test + public void testNullReadWrite() throws Exception { + String inputTable = "cp.`parquet/null_test_data.json`"; + runTestAndValidate("*", "*", inputTable, "nullable_test"); + } + + @Ignore // fails intermittenly when being run with other tests, a patch in DRILL @Test - @Ignore public void testDecimal() throws Exception { String selection = "cast(salary as decimal(8,2)) as decimal8, cast(salary as decimal(15,2)) as decimal15, " + "cast(salary as decimal(24,2)) as decimal24, cast(salary as decimal(38,2)) as decimal38"; String validateSelection = "decimal8, decimal15, decimal24, decimal38"; String inputTable = "cp.`employee.json`"; - runTestAndValidate(selection, validateSelection, inputTable, EMPLOYEE_PARQUET_PATH); + runTestAndValidate(selection, validateSelection, inputTable, "parquet_decimal"); } - // TODO - ask jacques about OperatorStats - // this is also experiencing the same failure as the 8th tpch dataset test above when run with the rest of the tests - // in this class all at once, not sure if this is IDE related for resorce management or something that should be looked - // at. @Test public void testMulipleRowGroups() throws Exception { try { //test(String.format("ALTER SESSION SET `%s` = %d", ExecConstants.PARQUET_BLOCK_SIZE, 1*1024*1024)); - String selection = "*"; + String selection = "mi"; String inputTable = "cp.`customer.json`"; - runTestAndValidate(selection, selection, inputTable, EMPLOYEE_PARQUET_PATH); + runTestAndValidate(selection, selection, inputTable, "foodmart_customer_parquet"); } finally { test(String.format("ALTER SESSION SET `%s` = %d", ExecConstants.PARQUET_BLOCK_SIZE, 512*1024*1024)); } @@ -183,7 +203,7 @@ public class TestParquetWriter extends BaseTestQuery { String selection = "cast(hire_date as DATE) as hire_date"; String validateSelection = "hire_date"; String inputTable = "cp.`employee.json`"; - runTestAndValidate(selection, validateSelection, inputTable, EMPLOYEE_PARQUET_PATH); + runTestAndValidate(selection, validateSelection, inputTable, "foodmart_employee_parquet"); } public void runTestAndValidate(String selection, String validationSelection, String inputTable, String outputFile) throws Exception { @@ -219,12 +239,7 @@ public class TestParquetWriter extends BaseTestQuery { for (int i = 0; i < loader.getRecordCount(); i++) { HashMap<String, Object> record = new HashMap<>(); for (VectorWrapper w : loader) { - Object obj = null; - try { - obj = w.getValueVector().getAccessor().getObject(i); - } catch (Exception ex) { - throw ex; - } + Object obj = w.getValueVector().getAccessor().getObject(i); if (obj != null) { if (obj instanceof Text) { obj = obj.toString(); @@ -235,6 +250,7 @@ public class TestParquetWriter extends BaseTestQuery { else if (obj instanceof byte[]) { obj = new String((byte[]) obj, "UTF-8"); } + record.put(w.getField().toExpr(), obj); } record.put(w.getField().toExpr(), obj); } @@ -252,13 +268,15 @@ public class TestParquetWriter extends BaseTestQuery { RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); addToMaterializedResults(expectedRecords, expected, loader, schema); addToMaterializedResults(actualRecords, result, loader, schema); - Assert.assertEquals("Different number of objects returned", expectedRecords.size(), actualRecords.size()); + Assert.assertEquals("Different number of records returned", expectedRecords.size(), actualRecords.size()); String missing = ""; int i = 0; + int counter = 0; int missmatch; for (Map<String, Object> record : expectedRecords) { missmatch = 0; + counter++; for (String column : record.keySet()) { if ( actualRecords.get(i).get(column) == null && expectedRecords.get(i).get(column) == null ) { continue; @@ -267,7 +285,7 @@ public class TestParquetWriter extends BaseTestQuery { continue; if ( (actualRecords.get(i).get(column) == null && record.get(column) == null) || ! actualRecords.get(i).get(column).equals(record.get(column))) { missmatch++; - System.out.println( i + " " + column + "[ex: " + record.get(column) + ", actual:" + actualRecords.get(i).get(column) + "]"); + System.out.println( counter + " " + column + "[ex: " + record.get(column) + ", actual:" + actualRecords.get(i).get(column) + "]"); } } if ( ! actualRecords.remove(record)) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 3e679bb..2193233 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.store.parquet; -import static org.apache.drill.exec.store.parquet.TestFileGenerator.intVals; import static org.apache.drill.exec.store.parquet.TestFileGenerator.populateFieldInfoMap; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -46,16 +45,17 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.proto.BitControl; import org.apache.drill.exec.proto.UserBitShared.QueryType; -import org.apache.drill.exec.proto.UserProtos; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.rpc.user.UserServer; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.CachedSingleFileSystem; import org.apache.drill.exec.store.TestOutputMutator; +import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; import org.apache.drill.exec.vector.ValueVector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; @@ -127,7 +127,15 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ readEntries = "\"/tmp/lineitem_null_dict.parquet\""; String planText = Files.toString(FileUtils.getResourceAsFile("/parquet/parquet_scan_screen_read_entry_replace.json"), Charsets.UTF_8).replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries); - testParquetFullEngineLocalText(planText, fileName, 1, 1, 100000, false); + //testParquetFullEngineLocalText(planText, fileName, 1, 1, 100000, false); + + testFull(QueryType.SQL, "select L_RECEIPTDATE from dfs.`/tmp/lineitem_null_dict.parquet`", "", 1, 1, 100000, false); + } + + @Ignore + @Test + public void testDictionaryError_419() throws Exception { + testFull(QueryType.SQL, "select c_address from dfs.`/tmp/customer_snappyimpala_drill_419.parquet`", "", 1, 1, 150000, false); } @Test @@ -265,7 +273,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ @Test public void testMultipleRowGroups() throws Exception { HashMap<String, FieldInfo> fields = new HashMap<>(); - ParquetTestProperties props = new ParquetTestProperties(3, 3000, DEFAULT_BYTES_PER_PAGE, fields); + ParquetTestProperties props = new ParquetTestProperties(2, 300, DEFAULT_BYTES_PER_PAGE, fields); populateFieldInfoMap(props); testParquetFullEngineEventBased(true, "/parquet/parquet_scan_screen.json", "/tmp/test.parquet", 1, props); } @@ -277,7 +285,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ @Test public void testNullableColumns() throws Exception { HashMap<String, FieldInfo> fields = new HashMap<>(); - ParquetTestProperties props = new ParquetTestProperties(1, 3000000, DEFAULT_BYTES_PER_PAGE, fields); + ParquetTestProperties props = new ParquetTestProperties(1, 1500000, DEFAULT_BYTES_PER_PAGE, fields); Object[] boolVals = {true, null, null}; props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals, TypeProtos.MinorType.BIT, props)); testParquetFullEngineEventBased(false, "/parquet/parquet_nullable.json", "/tmp/nullable_test.parquet", 1, props); @@ -289,23 +297,38 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ * Tests the reading of nullable var length columns, runs the tests twice, once on a file that has * a converted type of UTF-8 to make sure it can be read */ - public void testNullableColumnsVarLen() throws Exception { +public void testNullableColumnsVarLen() throws Exception { HashMap<String, FieldInfo> fields = new HashMap<>(); - ParquetTestProperties props = new ParquetTestProperties(1, 3000000, DEFAULT_BYTES_PER_PAGE, fields); + ParquetTestProperties props = new ParquetTestProperties(1, 300000, DEFAULT_BYTES_PER_PAGE, fields); byte[] val = {'b'}; byte[] val2 = {'b', '2'}; byte[] val3 = {'b', '3'}; byte[] val4 = { 'l','o','n','g','e','r',' ','s','t','r','i','n','g'}; - Object[] boolVals = { val, val2, val4}; - props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals, TypeProtos.MinorType.BIT, props)); + Object[] byteArrayVals = { val, val2, val4}; + props.fields.put("a", new FieldInfo("boolean", "a", 1, byteArrayVals, TypeProtos.MinorType.BIT, props)); testParquetFullEngineEventBased(false, "/parquet/parquet_nullable_varlen.json", "/tmp/nullable_varlen.parquet", 1, props); - fields.clear(); + HashMap<String, FieldInfo> fields2 = new HashMap<>(); // pass strings instead of byte arrays - Object[] boolVals2 = { new org.apache.hadoop.io.Text("b"), new org.apache.hadoop.io.Text("b2"), + Object[] textVals = { new org.apache.hadoop.io.Text("b"), new org.apache.hadoop.io.Text("b2"), new org.apache.hadoop.io.Text("b3")}; - props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals2, TypeProtos.MinorType.BIT, props)); + ParquetTestProperties props2 = new ParquetTestProperties(1, 30000, DEFAULT_BYTES_PER_PAGE, fields2); + props2.fields.put("a", new FieldInfo("boolean", "a", 1, textVals, TypeProtos.MinorType.BIT, props2)); + testParquetFullEngineEventBased(false, "/parquet/parquet_scan_screen_read_entry_replace.json", + "\"/tmp/varLen.parquet/a\"", "unused", 1, props2); + + } + + @Ignore + @Test + public void testFileWithNulls() throws Exception { + HashMap<String, FieldInfo> fields3 = new HashMap<>(); + ParquetTestProperties props3 = new ParquetTestProperties(1, 3000, DEFAULT_BYTES_PER_PAGE, fields3); + // actually include null values + Object[] valuesWithNull = {new Text(""), new Text("longer string"), null}; + props3.fields.put("a", new FieldInfo("boolean", "a", 1, valuesWithNull, TypeProtos.MinorType.BIT, props3)); testParquetFullEngineEventBased(false, "/parquet/parquet_scan_screen_read_entry_replace.json", - "\"/tmp/varLen.parquet/a\"", "unused", 1, props); + "\"/tmp/nullable_with_nulls.parquet\"", "unused", 1, props3); + } @Ignore @@ -319,7 +342,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ props.fields.put("n_regionkey", null); props.fields.put("n_comment", null); testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json", - "\"/tmp/nation_dictionary_fail.parquet\"", "unused", 1, props, true); + "\"/tmp/nation_dictionary_fail.parquet\"", "unused", 1, props, QueryType.LOGICAL); fields = new HashMap<>(); props = new ParquetTestProperties(1, 5, DEFAULT_BYTES_PER_PAGE, fields); @@ -332,7 +355,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ props.fields.put("height", null); props.fields.put("hair_thickness", null); testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json", - "\"/tmp/employees_5_16_14.parquet\"", "unused", 1, props, true); + "\"/tmp/employees_5_16_14.parquet\"", "unused", 1, props, QueryType.LOGICAL); } @Test @@ -352,25 +375,36 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ "/tmp/test.parquet", i, props); } + @Ignore @Test public void testReadError_Drill_901() throws Exception { // select cast( L_COMMENT as varchar) from dfs_test.`/tmp/drilltest/employee_parquet` HashMap<String, FieldInfo> fields = new HashMap<>(); - ParquetTestProperties props = new ParquetTestProperties(1, 120350, DEFAULT_BYTES_PER_PAGE, fields); + ParquetTestProperties props = new ParquetTestProperties(1, 60175, DEFAULT_BYTES_PER_PAGE, fields); testParquetFullEngineEventBased(false, false, "/parquet/par_writer_test.json", null, - "unused, no file is generated", 1, props, false); + "unused, no file is generated", 1, props, QueryType.PHYSICAL); } + @Ignore + @Test + public void testReadError_Drill_839() throws Exception { + // select cast( L_COMMENT as varchar) from dfs.`/tmp/drilltest/employee_parquet` + HashMap<String, FieldInfo> fields = new HashMap<>(); + ParquetTestProperties props = new ParquetTestProperties(1, 150000, DEFAULT_BYTES_PER_PAGE, fields); + String readEntries = "\"/tmp/customer_nonull.parquet\""; + testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json", readEntries, + "unused, no file is generated", 1, props, QueryType.LOGICAL); + } @Ignore @Test public void testReadBug_Drill_418() throws Exception { HashMap<String, FieldInfo> fields = new HashMap<>(); - ParquetTestProperties props = new ParquetTestProperties(5, 300000, DEFAULT_BYTES_PER_PAGE, fields); + ParquetTestProperties props = new ParquetTestProperties(1, 150000, DEFAULT_BYTES_PER_PAGE, fields); TestFileGenerator.populateDrill_418_fields(props); String readEntries = "\"/tmp/customer.plain.parquet\""; testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json", readEntries, - "unused, no file is generated", 1, props, true); + "unused, no file is generated", 1, props, QueryType.LOGICAL); } // requires binary file generated by pig from TPCH data, also have to disable assert where data is coming in @@ -378,35 +412,35 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ @Test public void testMultipleRowGroupsAndReadsPigError() throws Exception { HashMap<String, FieldInfo> fields = new HashMap<>(); - ParquetTestProperties props = new ParquetTestProperties(5, 300000, DEFAULT_BYTES_PER_PAGE, fields); + ParquetTestProperties props = new ParquetTestProperties(1, 1500000, DEFAULT_BYTES_PER_PAGE, fields); TestFileGenerator.populatePigTPCHCustomerFields(props); String readEntries = "\"/tmp/tpc-h/customer\""; testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json", readEntries, - "unused, no file is generated", 1, props, true); + "unused, no file is generated", 1, props, QueryType.LOGICAL); fields = new HashMap(); - props = new ParquetTestProperties(5, 300000, DEFAULT_BYTES_PER_PAGE, fields); + props = new ParquetTestProperties(1, 100000, DEFAULT_BYTES_PER_PAGE, fields); TestFileGenerator.populatePigTPCHSupplierFields(props); readEntries = "\"/tmp/tpc-h/supplier\""; testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json", readEntries, - "unused, no file is generated", 1, props, true); + "unused, no file is generated", 1, props, QueryType.LOGICAL); } @Ignore @Test public void drill_958bugTest() throws Exception { HashMap<String, FieldInfo> fields = new HashMap<>(); - ParquetTestProperties props = new ParquetTestProperties(5, 300000, DEFAULT_BYTES_PER_PAGE, fields); + ParquetTestProperties props = new ParquetTestProperties(1, 2880404, DEFAULT_BYTES_PER_PAGE, fields); TestFileGenerator.populatePigTPCHCustomerFields(props); String readEntries = "\"/tmp/store_sales\""; testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json", readEntries, - "unused, no file is generated", 1, props, true); + "unused, no file is generated", 1, props, QueryType.LOGICAL); } @Test public void testMultipleRowGroupsEvent() throws Exception { HashMap<String, FieldInfo> fields = new HashMap<>(); - ParquetTestProperties props = new ParquetTestProperties(4, 3000, DEFAULT_BYTES_PER_PAGE, fields); + ParquetTestProperties props = new ParquetTestProperties(2, 300, DEFAULT_BYTES_PER_PAGE, fields); populateFieldInfoMap(props); testParquetFullEngineEventBased(true, "/parquet/parquet_scan_screen.json", "/tmp/test.parquet", 1, props); } @@ -434,7 +468,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ props.fields.put("bigInt", new FieldInfo("int64", "bigInt", 64, TestFileGenerator.longVals, TypeProtos.MinorType.BIGINT, props)); props.fields.put("bin", new FieldInfo("binary", "bin", -1, TestFileGenerator.binVals, TypeProtos.MinorType.VARBINARY, props)); props.fields.put("bin2", new FieldInfo("binary", "bin2", -1, TestFileGenerator.bin2Vals, TypeProtos.MinorType.VARBINARY, props)); - testParquetFullEngineEventBased(true, false, "/parquet/parquet_selective_column_read.json", null, "/tmp/test.parquet", 1, props, false); + testParquetFullEngineEventBased(true, false, "/parquet/parquet_selective_column_read.json", null, "/tmp/test.parquet", 1, props, QueryType.PHYSICAL); } public static void main(String[] args) throws Exception{ @@ -502,19 +536,19 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ public void testParquetFullEngineEventBased(boolean generateNew, String plan, String readEntries, String filename, int numberOfTimesRead /* specified in json plan */, ParquetTestProperties props) throws Exception{ testParquetFullEngineEventBased(true, generateNew, plan, readEntries,filename, - numberOfTimesRead /* specified in json plan */, props, true); + numberOfTimesRead /* specified in json plan */, props, QueryType.LOGICAL); } // specific tests should call this method, but it is not marked as a test itself intentionally public void testParquetFullEngineEventBased(boolean generateNew, String plan, String filename, int numberOfTimesRead /* specified in json plan */, ParquetTestProperties props) throws Exception{ - testParquetFullEngineEventBased(true, generateNew, plan, null, filename, numberOfTimesRead, props, true); + testParquetFullEngineEventBased(true, generateNew, plan, null, filename, numberOfTimesRead, props, QueryType.LOGICAL); } // specific tests should call this method, but it is not marked as a test itself intentionally public void testParquetFullEngineEventBased(boolean testValues, boolean generateNew, String plan, String readEntries, String filename, int numberOfTimesRead /* specified in json plan */, ParquetTestProperties props, - boolean runAsLogicalPlan) throws Exception{ + QueryType queryType) throws Exception{ if (generateNew) TestFileGenerator.generateParquetFile(filename, props); ParquetResultListener resultListener = new ParquetResultListener(getAllocator(), props, numberOfTimesRead, testValues); @@ -524,11 +558,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ if (readEntries != null) { planText = planText.replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries); } - if (runAsLogicalPlan){ - this.testWithListener(QueryType.LOGICAL, planText, resultListener); - }else{ - this.testWithListener(QueryType.PHYSICAL, planText, resultListener); - } + this.testWithListener(queryType, planText, resultListener); resultListener.getResults(); long D = System.nanoTime(); System.out.println(String.format("Took %f s to run query", (float)(D-C) / 1E9)); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java index 4a0efc9..a624234 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.store.parquet; import static junit.framework.Assert.assertEquals; +import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.HashMap; @@ -32,6 +33,7 @@ import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.user.ConnectionThrottle; import org.apache.drill.exec.rpc.user.QueryResultBatch; import org.apache.drill.exec.rpc.user.UserResultsListener; +import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.ValueVector; import com.google.common.base.Strings; @@ -60,7 +62,7 @@ public class ParquetResultListener implements UserResultsListener { @Override public void submissionFailed(RpcException ex) { - logger.debug("Submission failed.", ex); + logger.error("Submission failed.", ex); future.setException(ex); } @@ -76,7 +78,9 @@ public class ParquetResultListener implements UserResultsListener { return; } - T val = (T) valueVector.getAccessor().getObject(index); + T val; + try { + val = (T) valueVector.getAccessor().getObject(index); if (val instanceof byte[]) { assert(Arrays.equals((byte[]) value, (byte[]) val)); } @@ -85,6 +89,9 @@ public class ParquetResultListener implements UserResultsListener { } else { assertEquals(value, val); } + } catch (Throwable ex) { + throw ex; + } } @Override @@ -126,7 +133,15 @@ public class ParquetResultListener implements UserResultsListener { } for (int j = 0; j < vv.getAccessor().getValueCount(); j++) { if (ParquetRecordReaderTest.VERBOSE_DEBUG){ - System.out.print(Strings.padStart(vv.getAccessor().getObject(j) + "", 20, ' ') + " "); + Object o = vv.getAccessor().getObject(j); + if (o instanceof byte[]) { + try { + o = new String((byte[])o, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + System.out.print(Strings.padStart(o + "", 20, ' ') + " "); System.out.print(", " + (j % 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : "")); } if (testValues){ @@ -164,7 +179,27 @@ public class ParquetResultListener implements UserResultsListener { for (VectorWrapper vw : batchLoader) { ValueVector v = vw.getValueVector(); - System.out.print(Strings.padStart(v.getAccessor().getObject(i) + "", 20, ' ') + " "); + Object o = v.getAccessor().getObject(i); + if (o instanceof byte[]) { + try { + // TODO - in the dictionary read error test there is some data that does not look correct + // the output of our reader matches the values of the parquet-mr cat/head tools (no full comparison was made, + // but from a quick check of a few values it looked consistent + // this might have gotten corrupted by pig somehow, or maybe this is just how the data is supposed ot look + // TODO - check this!! +// for (int k = 0; k < ((byte[])o).length; k++ ) { +// // check that the value at each position is a valid single character ascii value. +// +// if (((byte[])o)[k] > 128) { +// System.out.println("batch: " + batchCounter + " record: " + recordCount); +// } +// } + o = new String((byte[])o, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + System.out.print(Strings.padStart(o + "", 20, ' ') + " "); } System.out.println(); } @@ -174,7 +209,7 @@ public class ParquetResultListener implements UserResultsListener { if(result.getHeader().getIsLastChunk()){ // ensure the right number of columns was returned, especially important to ensure selective column read is working if (testValues) { - assertEquals( "Unexpected number of output columns from parquet scan.", valuesChecked.keySet().size(), props.fields.keySet().size() ); + assertEquals( "Unexpected number of output columns from parquet scan.", props.fields.keySet().size(), valuesChecked.keySet().size() ); } for (String s : valuesChecked.keySet()) { try { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java index 2d2a2ec..3c0287d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.parquet; import static parquet.column.Encoding.PLAIN; +import static parquet.column.Encoding.RLE; import java.util.HashMap; @@ -29,6 +30,7 @@ import org.apache.hadoop.fs.Path; import parquet.bytes.BytesInput; import parquet.column.ColumnDescriptor; +import parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; import parquet.hadoop.ParquetFileWriter; import parquet.hadoop.metadata.CompressionCodecName; import parquet.schema.MessageType; @@ -37,8 +39,6 @@ import parquet.schema.MessageTypeParser; public class TestFileGenerator { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestFileGenerator.class); - - // 10 mb per page static int bytesPerPage = 1024 * 1024 * 1; // { 00000001, 00000010, 00000100, 00001000, 00010000, ... } @@ -57,6 +57,9 @@ public class TestFileGenerator { static final Object[] binVals = { varLen1, varLen2, varLen3 }; static final Object[] bin2Vals = { varLen3, varLen2, varLen1 }; + // TODO - figure out what this should be set at, it should be based on the max nesting level + public static final int MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS = 16; + static void populateDrill_418_fields(ParquetTestProperties props){ props.fields.put("cust_key", new FieldInfo("int32", "integer", 32, intVals, TypeProtos.MinorType.INT, props)); @@ -102,6 +105,34 @@ public class TestFileGenerator { props.fields.put("S_COMMENT", new FieldInfo("binary", "bin2", -1, bin2Vals, TypeProtos.MinorType.VARBINARY, props)); } + private static abstract class ValueProducer { + + public abstract void reset(); + public abstract Object getValue(); + } + + private static class ValueRepeaterProducer extends ValueProducer { + + WrapAroundCounter position; + Object[] values; + + public ValueRepeaterProducer(Object[] values) { + this.values = values; + position = new WrapAroundCounter(values.length); + } + + @Override + public void reset() { + position.reset(); + } + + public Object getValue() { + Object ret = values[position.val]; + position.increment(); + return ret; + } + } + public static void generateParquetFile(String filename, ParquetTestProperties props) throws Exception { int currentBooleanByte = 0; @@ -133,7 +164,7 @@ public class TestFileGenerator { HashMap<String, Integer> columnValuesWritten = new HashMap(); int valsWritten; for (int k = 0; k < props.numberRowGroups; k++){ - w.startBlock(1); + w.startBlock(props.recordsPerRowGroup); currentBooleanByte = 0; booleanBitCounter.reset(); @@ -152,6 +183,8 @@ public class TestFileGenerator { w.startColumn(c1, props.recordsPerRowGroup, codec); int valsPerPage = (int) Math.ceil(props.recordsPerRowGroup / (float) fieldInfo.numberOfPages); byte[] bytes; + RunLengthBitPackingHybridValuesWriter defLevels = new RunLengthBitPackingHybridValuesWriter(MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS, valsPerPage); + RunLengthBitPackingHybridValuesWriter repLevels = new RunLengthBitPackingHybridValuesWriter(MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS, valsPerPage); // for variable length binary fields int bytesNeededToEncodeLength = 4; if ((int) fieldInfo.bitLength > 0) { @@ -169,6 +202,8 @@ public class TestFileGenerator { int bytesWritten = 0; for (int z = 0; z < (int) fieldInfo.numberOfPages; z++, bytesWritten = 0) { for (int i = 0; i < valsPerPage; i++) { + repLevels.writeInteger(0); + defLevels.writeInteger(1); //System.out.print(i + ", " + (i % 25 == 0 ? "\n gen " + fieldInfo.name + ": " : "")); if (fieldInfo.values[0] instanceof Boolean) { @@ -195,7 +230,13 @@ public class TestFileGenerator { } } - w.writeDataPage((int) (props.recordsPerRowGroup / (int) fieldInfo.numberOfPages), bytes.length, BytesInput.from(bytes), PLAIN, PLAIN, PLAIN); + byte[] fullPage = new byte[2 * 4 * valsPerPage + bytes.length]; + byte[] repLevelBytes = repLevels.getBytes().toByteArray(); + byte[] defLevelBytes = defLevels.getBytes().toByteArray(); + System.arraycopy(bytes, 0, fullPage, 0, bytes.length); + System.arraycopy(repLevelBytes, 0, fullPage, bytes.length, repLevelBytes.length); + System.arraycopy(defLevelBytes, 0, fullPage, bytes.length + repLevelBytes.length, defLevelBytes.length); + w.writeDataPage( (props.recordsPerRowGroup / fieldInfo.numberOfPages), fullPage.length, BytesInput.from(fullPage), RLE, RLE, PLAIN); currentBooleanByte = 0; } w.endColumn(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/test/resources/parquet/alltypes_repeated.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/parquet/alltypes_repeated.json b/exec/java-exec/src/test/resources/parquet/alltypes_repeated.json new file mode 100644 index 0000000..927cb52 --- /dev/null +++ b/exec/java-exec/src/test/resources/parquet/alltypes_repeated.json @@ -0,0 +1,28 @@ +{ + "TINYINT_col" : [ 1, 2, 3, 4, 5, -1, -2, -3, -4, -5, 10000, -10000 ], + "UINT1_col" : [ 1, 2, 3, 4, 5, 10000 ], + "UINT2_col" : [ 1, 2, 3, 4, 5, 10000 ], + SMALLINT_col" : [ 1, 2, 3, 4, 5, -1, -2, -3, -4, -5, 100000, -100000 ], + "INT_col" : [ 1, 2, 3, 4, 5, -1, -2, -3, -4, -5, 2147483647, -2147483648 ], + "UINT4_col" : [ 1, 2, 3, 4, 5, 2147483700 ], + "FLOAT4_col" : [ 1.0, 2.0, 3.0, 4.0, 5.0, 1000000000000.0, -1000000000000.0 ], + "TIME_col" : [ "2:30, "11:45", "12:00", 11:59", 23:59" ], + "DECIMAL9_col" : [ "1.0", "2.0", "3.0", "4.0", "5.0", "100.100", "0.0000001" ], + "BIGINT_col" : [ 1, 2, 3, 4, 5, 9223372036854775000, -9223372036854775000], + "UINT8_col" : [ "1", "2", "3", "4", "5", "9223372036854778000" ], + "FLOAT8_col" : [ 1.0, 2.0, 3.0, 4.0, 5.0, 10000000000000.0, -10000000000000.0 ], + "DATE_col": [ "1995-01-01", "1995-01-02", "1995-01-03", "1995-01-04", "1995-01-05" ], + "TIMESTAMP_col" : [ "1995-01-01 01:00:00.000","1995-01-01 01:00:00.000", "1995-01-01 01:00:00.000", "1995-01-01 01:00:00.000" ], + "DECIMAL18_col" : ["123456789.000000000", "11.123456789", "0.100000000", "-0.100400000", "-987654321.123456789", "-2.030100000"], + "INTERVALYEAR" : + "INTERVALDAY" : + "INTERVAL" : + "DECIMAL28DENSE_col", + "DECIMAL38DENSE_col", + "DECIMAL38SPARSE_col", + "DECIMAL28SPARSE_col", + "VARBINARY_col", + "VARCHAR_col", + "VAR16CHAR_col", + "BIT_col", +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/test/resources/parquet/basic_repeated.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/parquet/basic_repeated.json b/exec/java-exec/src/test/resources/parquet/basic_repeated.json new file mode 100644 index 0000000..ae39685 --- /dev/null +++ b/exec/java-exec/src/test/resources/parquet/basic_repeated.json @@ -0,0 +1,9 @@ +{ + "int32": [1,2] +} +{ + "int32": [] +} +{ + "int32": [1] +}
