DRILL-930: Support date read/write in parquet.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f1c6b984 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f1c6b984 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f1c6b984 Branch: refs/heads/master Commit: f1c6b9849ebffff592f940e6c03e0e48b2bf7a85 Parents: 8404068 Author: Jason Altekruse <[email protected]> Authored: Sun Jun 8 16:50:55 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Tue Jun 10 18:59:39 2014 -0700 ---------------------------------------------------------------------- .../templates/ParquetOutputRecordWriter.java | 23 ++++++++++++++- .../codegen/templates/ParquetTypeHelper.java | 2 +- .../store/parquet/FixedByteAlignedReader.java | 22 ++++++++++++++ .../store/parquet/NullableColumnReader.java | 3 +- .../parquet/NullableFixedByteAlignedReader.java | 30 ++++++++++++++++++++ .../exec/store/parquet/ParquetRecordReader.java | 8 ++++-- .../physical/impl/writer/TestParquetWriter.java | 22 +++++++++++++- 7 files changed, 103 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java index 5f75c1c..07bd449 100644 --- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java @@ -16,6 +16,7 @@ * limitations under the License. */ +import org.joda.time.DateTimeUtils; import parquet.io.api.Binary; import java.lang.Override; @@ -38,6 +39,14 @@ import parquet.io.api.RecordConsumer; import parquet.schema.MessageType; import parquet.io.api.Binary; import io.netty.buffer.ByteBuf; +import org.apache.drill.exec.memory.TopLevelAllocator; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; + + +import org.apache.drill.common.types.TypeProtos; + +import org.joda.time.DateTimeUtils; import java.io.IOException; import java.lang.UnsupportedOperationException; @@ -57,6 +66,7 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter { private RecordConsumer consumer; private MessageType schema; + public static final long JULIAN_DAY_EPOC = DateTimeUtils.toJulianDayNumber(0); public void setUp(MessageType schema, RecordConsumer consumer) { this.schema = schema; @@ -106,7 +116,6 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter { minor.class == "BigInt" || minor.class == "Decimal18" || minor.class == "TimeStamp" || - minor.class == "Date" || minor.class == "UInt8"> <#if mode.prefix == "Repeated" > consumer.addLong(valueHolder.vector.getAccessor().get(i)); @@ -115,6 +124,15 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter { consumer.addLong(valueHolder.value); consumer.endField(schema.getFieldName(fieldId), fieldId); </#if> + <#elseif minor.class == "Date"> + <#if mode.prefix == "Repeated" > + consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(valueHolder.vector.getAccessor().get(i)) + JULIAN_DAY_EPOC)); + <#else> + consumer.startField(schema.getFieldName(fieldId), fieldId); + // convert from internal Drill date format to Julian Day centered around Unix Epoc + consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(valueHolder.value) + JULIAN_DAY_EPOC)); + consumer.endField(schema.getFieldName(fieldId), fieldId); + </#if> <#elseif minor.class == "Float8"> <#if mode.prefix == "Repeated" > @@ -139,6 +157,9 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter { <#if mode.prefix == "Repeated" > <#else> consumer.startField(schema.getFieldName(fieldId), fieldId); + ${minor.class}Vector tempVec = new ${minor.class}Vector(MaterializedField.create("", TypeProtos.MajorType.getDefaultInstance()), new TopLevelAllocator()); + tempVec.allocateNew(10); + tempVec.getMutator().setSafe(0, valueHolder); byte[] bytes = DecimalUtility.getBigDecimalFromSparse( valueHolder.buffer, valueHolder.start, ${minor.class}Holder.nDecimalDigits, valueHolder.scale).unscaledValue().toByteArray(); byte[] output = new byte[ParquetTypeHelper.getLengthForMinorType(MinorType.${minor.class?upper_case})]; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java b/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java index b268d33..15830f6 100644 --- a/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java +++ b/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java @@ -56,6 +56,7 @@ public class ParquetTypeHelper { minor.class == "Time" || minor.class == "IntervalYear" || minor.class == "Decimal9" || + minor.class == "Date" || minor.class == "UInt4"> typeMap.put(MinorType.${minor.class?upper_case}, PrimitiveTypeName.INT32); <#elseif @@ -65,7 +66,6 @@ public class ParquetTypeHelper { minor.class == "BigInt" || minor.class == "Decimal18" || minor.class == "TimeStamp" || - minor.class == "Date" || minor.class == "UInt8"> typeMap.put(MinorType.${minor.class?upper_case}, PrimitiveTypeName.INT64); <#elseif http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java index 574b0cb..26e1f09 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java @@ -19,11 +19,15 @@ package org.apache.drill.exec.store.parquet; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.util.DecimalUtility; +import org.apache.drill.exec.expr.holders.DateHolder; import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; import org.apache.drill.exec.expr.holders.Decimal38SparseHolder; +import org.apache.drill.exec.store.ParquetOutputRecordWriter; +import org.apache.drill.exec.vector.DateVector; import org.apache.drill.exec.vector.Decimal28SparseVector; import org.apache.drill.exec.vector.Decimal38SparseVector; import org.apache.drill.exec.vector.ValueVector; +import org.joda.time.DateTimeUtils; import parquet.column.ColumnDescriptor; import parquet.format.ConvertedType; import parquet.format.SchemaElement; @@ -92,6 +96,24 @@ class FixedByteAlignedReader extends ColumnReader { abstract void addNext(int start, int index); } + public static class DateReader extends ConvertedReader { + + DateVector dateVector; + + DateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + dateVector = (DateVector) v; + } + + @Override + void addNext(int start, int index) { + dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay( + NullableFixedByteAlignedReader.NullableDateReader.readIntLittleEndian(bytes, start) + - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); + } + } + public static class Decimal28Reader extends ConvertedReader { Decimal28SparseVector decimal28Vector; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java index 88a382a..eeb0344 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java @@ -74,8 +74,7 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader< lastValueWasNull = true; nullsFound = 0; if (currentValueIndexInVector == recordsToReadInThisPass - || currentValueIndexInVector >= valueVec.getValueCapacity() - || pageReadStatus.readPosInBytes >= pageReadStatus.byteLength){ + || currentValueIndexInVector >= valueVec.getValueCapacity()) { break; } while(currentValueIndexInVector < recordsToReadInThisPass http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java index d4416c8..17759d3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java @@ -21,10 +21,14 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.util.DecimalUtility; import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder; import org.apache.drill.exec.expr.holders.NullableDecimal38SparseHolder; +import org.apache.drill.exec.store.ParquetOutputRecordWriter; +import org.apache.drill.exec.vector.NullableDateVector; import org.apache.drill.exec.vector.NullableDecimal28SparseVector; import org.apache.drill.exec.vector.NullableDecimal38SparseVector; import org.apache.drill.exec.vector.ValueVector; +import org.joda.time.DateTimeUtils; +import parquet.bytes.BytesUtils; import parquet.column.ColumnDescriptor; import parquet.format.ConvertedType; import parquet.format.SchemaElement; @@ -86,6 +90,32 @@ class NullableFixedByteAlignedReader extends NullableColumnReader { abstract void addNext(int start, int index); } + public static class NullableDateReader extends NullableConvertedReader { + + NullableDateVector dateVector; + + NullableDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + dateVector = (NullableDateVector) v; + } + + @Override + void addNext(int start, int index) { + dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(readIntLittleEndian(bytes, start) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); + } + + // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared + public static int readIntLittleEndian(byte[] in, int offset) { + int ch4 = in[offset] & 0xff; + int ch3 = in[offset + 1] & 0xff; + int ch2 = in[offset + 2] & 0xff; + int ch1 = in[offset + 3] & 0xff; + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + } + + } + public static class NullableDecimal28Reader extends NullableConvertedReader { NullableDecimal28SparseVector decimal28Vector; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java index 4c5f4bb..70560e7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java @@ -321,8 +321,9 @@ public class ParquetRecordReader implements RecordReader { } else if (length <= 16) { columnStatuses.add(new Decimal38Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement)); } - } - else{ + } else if (columnChunkMetaData.getType() == PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){ + columnStatuses.add(new FixedByteAlignedReader.DateReader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement)); + } else{ if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { columnStatuses.add(new ParquetFixedWidthDictionaryReader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement)); @@ -337,6 +338,8 @@ public class ParquetRecordReader implements RecordReader { if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){ columnStatuses.add(new NullableBitReader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement)); + } else if (columnChunkMetaData.getType() == PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){ + columnStatuses.add(new NullableFixedByteAlignedReader.NullableDateReader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement)); } else if (columnChunkMetaData.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){ int length = schemaElement.type_length; if (length <= 12) { @@ -411,6 +414,7 @@ public class ParquetRecordReader implements RecordReader { return toMajorType(primitiveTypeName, 0, mode, schemaElement); } + // TODO - move this into ParquetTypeHelper and use code generation to create the list static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length, TypeProtos.DataMode mode, SchemaElement schemaElement) { ConvertedType convertedType = schemaElement.getConverted_type(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index 8afcd7e..9febc58 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -83,6 +83,27 @@ public class TestParquetWriter extends BaseTestQuery { } @Test + public void testTPCHReadWrite1_date_convertedType() throws Exception { + String selection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " + + "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, cast(L_COMMITDATE as DATE) as COMMITDATE, cast(L_RECEIPTDATE as DATE) AS RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT"; + String validationSelection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " + + "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE,COMMITDATE ,RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT"; + String inputTable = "cp.`tpch/lineitem.parquet`"; + runTestAndValidate(selection, validationSelection, inputTable, "lineitem_parquet"); + } + + // TODO file a JIRA for running this query with the projected column names the same as the originals, it failed with a deadbuf + // on the client, it appeared that the projection was sending batches out with a record count but a deadbuf + /* + String selection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " + + "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, cast(L_COMMITDATE as DATE) as L_COMMITDATE, cast(L_RECEIPTDATE as DATE) AS L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT"; + String validationSelection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " + + "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE,COMMITDATE ,RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT"; + */ + // this is rather odd, I can select the data out fo parquet and project it to cast the date fields + // this stores all of the data correctly, but when I got to read it out again with the query that created it (with redudant casts I beleive) it has + // everything but the cast date columns as nulls + @Test public void testTPCHReadWrite2() throws Exception { String inputTable = "cp.`tpch/customer.parquet`"; runTestAndValidate("*", "*", inputTable, "customer_parquet"); @@ -152,7 +173,6 @@ public class TestParquetWriter extends BaseTestQuery { @Test - @Ignore //enable once Date is enabled public void testDate() throws Exception { String selection = "cast(hire_date as DATE) as hire_date"; String validateSelection = "hire_date";
