PARQUET-358: Add support for Avro's logical types API. This adds support for Avro's logical types API to parquet-avro.
* The logical types API was introduced in Avro 1.8.0, so this bumps the Avro dependency version to 1.8.0. * Types supported are: decimal, date, time-millis, time-micros, timestamp-millis, and timestamp-micros * Tests have been copied from Avro and ported to the parquet-avro API Author: Ryan Blue <b...@apache.org> Closes #318 from rdblue/PARQUET-358-add-avro-logical-types-api and squashes the following commits: bd81f9c [Ryan Blue] PARQUET-358: Fix review items. 0a882ee [Ryan Blue] PARQUET-358: Add logical types circular reference test. 5124618 [Ryan Blue] PARQUET-358: Add license documentation for code from Avro. dcb14be [Ryan Blue] PARQUET-358: Add support for Avro's logical types API. Conflicts: parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java Resolution: Binary fixed in last commit, kept changes in AvroWriteSupport. Minor import chagnes in AvroRecordConverter. Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/36e14294 Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/36e14294 Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/36e14294 Branch: refs/heads/parquet-1.8.x Commit: 36e14294235616a5658719e49e98a525856438af Parents: 2282c22 Author: Ryan Blue <b...@apache.org> Authored: Wed Apr 20 08:41:22 2016 -0700 Committer: Ryan Blue <b...@apache.org> Committed: Mon Jan 9 16:54:54 2017 -0800 ---------------------------------------------------------------------- LICENSE | 8 + NOTICE | 11 + parquet-avro/pom.xml | 4 - .../avro/AvroIndexedRecordConverter.java | 18 +- .../apache/parquet/avro/AvroReadSupport.java | 4 +- .../parquet/avro/AvroRecordConverter.java | 121 +++- .../parquet/avro/AvroSchemaConverter.java | 147 ++-- .../apache/parquet/avro/AvroWriteSupport.java | 165 +++-- .../parquet/avro/ParentValueContainer.java | 175 +++++ .../src/main/resources/META-INF/LICENSE | 186 +++++ parquet-avro/src/main/resources/META-INF/NOTICE | 18 + .../org/apache/parquet/avro/AvroTestUtil.java | 53 ++ .../parquet/avro/TestAvroSchemaConverter.java | 278 +++++++- .../parquet/avro/TestCircularReferences.java | 383 ++++++++++ .../parquet/avro/TestGenericLogicalTypes.java | 271 +++++++ .../org/apache/parquet/avro/TestReadWrite.java | 118 +++- .../avro/TestReadWriteOldListBehavior.java | 1 - .../parquet/avro/TestReflectLogicalTypes.java | 705 +++++++++++++++++++ .../java/org/apache/parquet/schema/Types.java | 12 +- .../org/apache/parquet/io/api/TestBinary.java | 10 - pom.xml | 1 + 21 files changed, 2553 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/36e14294/LICENSE ---------------------------------------------------------------------- diff --git a/LICENSE b/LICENSE index b759148..b006581 100644 --- a/LICENSE +++ b/LICENSE @@ -178,6 +178,14 @@ -------------------------------------------------------------------------------- +This product includes code from Apache Avro. + +Copyright: 2014 The Apache Software Foundation. +Home page: https://avro.apache.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + This project includes code from Daniel Lemire's JavaFastPFOR project. The "Lemire" bit packing source code produced by parquet-generator is derived from the JavaFastPFOR project. http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/36e14294/NOTICE ---------------------------------------------------------------------- diff --git a/NOTICE b/NOTICE index c6e3bf2..a9b6c56 100644 --- a/NOTICE +++ b/NOTICE @@ -43,3 +43,14 @@ with the following copyright notice: See the License for the specific language governing permissions and limitations under the License. +-------------------------------------------------------------------------------- + +This product includes code from Apache Avro, which includes the following in +its NOTICE file: + + Apache Avro + Copyright 2010-2015 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/36e14294/parquet-avro/pom.xml ---------------------------------------------------------------------- diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml index 9434343..b34838f 100644 --- a/parquet-avro/pom.xml +++ b/parquet-avro/pom.xml @@ -32,10 +32,6 @@ <name>Apache Parquet Avro</name> <url>https://parquet.apache.org</url> - <properties> - <avro.version>1.7.6</avro.version> - </properties> - <dependencies> <dependency> <groupId>org.apache.parquet</groupId> http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/36e14294/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java index 06c66d6..48eab4d 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java @@ -21,6 +21,8 @@ package org.apache.parquet.avro; import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.Map; +import org.apache.avro.Conversion; +import org.apache.avro.LogicalType; import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericData; @@ -111,6 +113,11 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter @SuppressWarnings("unchecked") private static <T> Class<T> getDatumClass(GenericData model, Schema schema) { + if (model.getConversionFor(schema.getLogicalType()) != null) { + // use generic classes to pass data to conversions + return null; + } + if (model instanceof SpecificData) { return (Class<T>) ((SpecificData) model).getClass(schema); } @@ -133,7 +140,16 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter } private static Converter newConverter(Schema schema, Type type, - GenericData model, ParentValueContainer parent) { + GenericData model, ParentValueContainer setter) { + + LogicalType logicalType = schema.getLogicalType(); + // the expected type is always null because it is determined by the parent + // datum class, which never helps for generic. when logical types are added + // to specific, this should pass the expected type here. + Conversion<?> conversion = model.getConversionFor(logicalType); + ParentValueContainer parent = ParentValueContainer + .getConversionContainer(setter, conversion, schema); + if (schema.getType().equals(Schema.Type.BOOLEAN)) { return new AvroConverters.FieldBooleanConverter(parent); } else if (schema.getType().equals(Schema.Type.INT)) { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/36e14294/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java index e73e8af..7d55bf5 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java @@ -110,9 +110,9 @@ public class AvroReadSupport<T> extends ReadSupport<T> { MessageType parquetSchema = readContext.getRequestedSchema(); Schema avroSchema; - if (readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY) != null) { + if (metadata.get(AVRO_READ_SCHEMA_METADATA_KEY) != null) { // use the Avro read schema provided by the user - avroSchema = new Schema.Parser().parse(readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY)); + avroSchema = new Schema.Parser().parse(metadata.get(AVRO_READ_SCHEMA_METADATA_KEY)); } else if (keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY) != null) { // use the Avro schema from the file metadata if present avroSchema = new Schema.Parser().parse(keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY)); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/36e14294/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java index 61d7d8e..30ed929 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java @@ -29,13 +29,20 @@ import it.unimi.dsi.fastutil.shorts.ShortArrayList; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.LinkedHashMap; +import org.apache.avro.AvroTypeException; +import org.apache.avro.Conversion; +import org.apache.avro.LogicalType; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.apache.avro.reflect.AvroIgnore; +import org.apache.avro.reflect.AvroName; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.Stringable; import org.apache.avro.specific.SpecificData; @@ -67,7 +74,8 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter { private static final String JAVA_CLASS_PROP = "java-class"; private static final String JAVA_KEY_CLASS_PROP = "java-key-class"; - protected T currentRecord; + protected T currentRecord = null; + private ParentValueContainer rootContainer = null; private final Converter[] converters; private final Schema avroSchema; @@ -78,6 +86,15 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter { public AvroRecordConverter(MessageType parquetSchema, Schema avroSchema, GenericData baseModel) { this(null, parquetSchema, avroSchema, baseModel); + LogicalType logicalType = avroSchema.getLogicalType(); + Conversion<?> conversion = baseModel.getConversionFor(logicalType); + this.rootContainer = ParentValueContainer.getConversionContainer(new ParentValueContainer() { + @Override + @SuppressWarnings("unchecked") + public void add(Object value) { + AvroRecordConverter.this.currentRecord = (T) value; + } + }, conversion, avroSchema); } public AvroRecordConverter(ParentValueContainer parent, @@ -99,6 +116,8 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter { recordClass = getDatumClass(avroSchema, model); } + Map<String, Class<?>> fields = getFieldsByName(recordClass, false); + int parquetFieldIndex = 0; for (Type parquetField: parquetSchema.getFields()) { final Schema.Field avroField = getAvroField(parquetField.getName()); @@ -110,8 +129,10 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter { AvroRecordConverter.this.set(avroField.name(), finalAvroIndex, value); } }; + + Class<?> fieldClass = fields.get(avroField.name()); converters[parquetFieldIndex] = newConverter( - nonNullSchema, parquetField, this.model, container); + nonNullSchema, parquetField, this.model, fieldClass, container); // @Stringable doesn't affect the reflected schema; must be enforced here if (recordClass != null && @@ -145,6 +166,43 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter { } } + // this was taken from Avro's ReflectData + private static Map<String, Class<?>> getFieldsByName(Class<?> recordClass, + boolean excludeJava) { + Map<String, Class<?>> fields = new LinkedHashMap<String, Class<?>>(); + + if (recordClass != null) { + Class<?> current = recordClass; + do { + if (excludeJava && current.getPackage() != null + && current.getPackage().getName().startsWith("java.")) { + break; // skip java built-in classes + } + for (Field field : current.getDeclaredFields()) { + if (field.isAnnotationPresent(AvroIgnore.class) || + isTransientOrStatic(field)) { + continue; + } + AvroName altName = field.getAnnotation(AvroName.class); + Class<?> existing = fields.put( + altName != null ? altName.value() : field.getName(), + field.getType()); + if (existing != null) { + throw new AvroTypeException( + current + " contains two fields named: " + field.getName()); + } + } + current = current.getSuperclass(); + } while (current != null); + } + + return fields; + } + + private static boolean isTransientOrStatic(Field field) { + return (field.getModifiers() & (Modifier.TRANSIENT | Modifier.STATIC)) != 0; + } + private Schema.Field getAvroField(String parquetFieldName) { Schema.Field avroField = avroSchema.getField(parquetFieldName); if (avroField != null) { @@ -162,12 +220,28 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter { parquetFieldName)); } + private static Converter newConverter( + Schema schema, Type type, GenericData model, ParentValueContainer setter) { + return newConverter(schema, type, model, null, setter); + } + private static Converter newConverter(Schema schema, Type type, - GenericData model, ParentValueContainer parent) { + GenericData model, Class<?> knownClass, ParentValueContainer setter) { + LogicalType logicalType = schema.getLogicalType(); + Conversion<?> conversion; + if (knownClass != null) { + conversion = model.getConversionByClass(knownClass, logicalType); + } else { + conversion = model.getConversionFor(logicalType); + } + + ParentValueContainer parent = ParentValueContainer + .getConversionContainer(setter, conversion, schema); + if (schema.getType().equals(Schema.Type.BOOLEAN)) { return new AvroConverters.FieldBooleanConverter(parent); } else if (schema.getType().equals(Schema.Type.INT)) { - Class<?> datumClass = getDatumClass(schema, model); + Class<?> datumClass = getDatumClass(conversion, knownClass, schema, model); if (datumClass == null) { return new AvroConverters.FieldIntegerConverter(parent); } else if (datumClass == byte.class || datumClass == Byte.class) { @@ -185,7 +259,7 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter { } else if (schema.getType().equals(Schema.Type.DOUBLE)) { return new AvroConverters.FieldDoubleConverter(parent); } else if (schema.getType().equals(Schema.Type.BYTES)) { - Class<?> datumClass = getDatumClass(schema, model); + Class<?> datumClass = getDatumClass(conversion, knownClass, schema, model); if (datumClass == null) { return new AvroConverters.FieldByteBufferConverter(parent); } else if (datumClass.isArray() && datumClass.getComponentType() == byte.class) { @@ -199,7 +273,7 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter { } else if (schema.getType().equals(Schema.Type.ENUM)) { return new AvroConverters.FieldEnumConverter(parent, schema, model); } else if (schema.getType().equals(Schema.Type.ARRAY)) { - Class<?> datumClass = getDatumClass(schema, model); + Class<?> datumClass = getDatumClass(conversion, knownClass, schema, model); if (datumClass != null && datumClass.isArray()) { return new AvroArrayConverter( parent, type.asGroupType(), schema, model, datumClass); @@ -263,8 +337,24 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter { } } - @SuppressWarnings("unchecked") private static <T> Class<T> getDatumClass(Schema schema, GenericData model) { + return getDatumClass(null, null, schema, model); + } + + @SuppressWarnings("unchecked") + private static <T> Class<T> getDatumClass(Conversion<?> conversion, + Class<T> knownClass, + Schema schema, GenericData model) { + if (conversion != null) { + // use generic classes to pass data to conversions + return null; + } + + // known class can be set when using reflect + if (knownClass != null) { + return knownClass; + } + if (model instanceof SpecificData) { // this works for reflect as well return ((SpecificData) model).getClass(schema); @@ -312,6 +402,9 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter { fillInDefaults(); if (parent != null) { parent.add(currentRecord); + } else { + // this applies any converters needed for the root value + rootContainer.add(currentRecord); } } @@ -500,10 +593,10 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter { // matching it against the element schema. if (isElementType(repeatedType, elementSchema)) { // the element type is the repeated type (and required) - converter = newConverter(elementSchema, repeatedType, model, setter); + converter = newConverter(elementSchema, repeatedType, model, elementClass, setter); } else { // the element is wrapped in a synthetic group and may be optional - converter = new PrimitiveElementConverter( + converter = new ArrayElementConverter( repeatedType.asGroupType(), elementSchema, model, setter); } } @@ -641,20 +734,20 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter { * } * </pre> */ - final class PrimitiveElementConverter extends GroupConverter { + final class ArrayElementConverter extends GroupConverter { private boolean isSet; private final Converter elementConverter; - public PrimitiveElementConverter(GroupType repeatedType, - Schema elementSchema, GenericData model, - final ParentValueContainer setter) { + public ArrayElementConverter(GroupType repeatedType, + Schema elementSchema, GenericData model, + final ParentValueContainer setter) { Type elementType = repeatedType.getType(0); Preconditions.checkArgument( !elementClass.isPrimitive() || elementType.isRepetition(REQUIRED), "Cannot convert list of optional elements to primitive array"); Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema); this.elementConverter = newConverter( - nonNullElementSchema, elementType, model, new ParentValueContainer() { + nonNullElementSchema, elementType, model, elementClass, new ParentValueContainer() { @Override public void add(Object value) { isSet = true; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/36e14294/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 6cfa8d1..6b9b94c 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -18,20 +18,26 @@ */ package org.apache.parquet.avro; -import java.util.*; - +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; -import org.codehaus.jackson.node.NullNode; import org.apache.parquet.schema.ConversionPatterns; +import org.apache.parquet.schema.DecimalMetadata; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import static org.apache.avro.JsonProperties.NULL_VALUE; import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE; import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT; import static org.apache.parquet.schema.OriginalType.*; @@ -113,26 +119,28 @@ public class AvroSchemaConverter { return convertField(fieldName, schema, Type.Repetition.REQUIRED); } + @SuppressWarnings("deprecation") private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) { + Types.PrimitiveBuilder<PrimitiveType> builder; Schema.Type type = schema.getType(); if (type.equals(Schema.Type.BOOLEAN)) { - return primitive(fieldName, BOOLEAN, repetition); + builder = Types.primitive(BOOLEAN, repetition); } else if (type.equals(Schema.Type.INT)) { - return primitive(fieldName, INT32, repetition); + builder = Types.primitive(INT32, repetition); } else if (type.equals(Schema.Type.LONG)) { - return primitive(fieldName, INT64, repetition); + builder = Types.primitive(INT64, repetition); } else if (type.equals(Schema.Type.FLOAT)) { - return primitive(fieldName, FLOAT, repetition); + builder = Types.primitive(FLOAT, repetition); } else if (type.equals(Schema.Type.DOUBLE)) { - return primitive(fieldName, DOUBLE, repetition); + builder = Types.primitive(DOUBLE, repetition); } else if (type.equals(Schema.Type.BYTES)) { - return primitive(fieldName, BINARY, repetition); + builder = Types.primitive(BINARY, repetition); } else if (type.equals(Schema.Type.STRING)) { - return primitive(fieldName, BINARY, repetition, UTF8); + builder = Types.primitive(BINARY, repetition).as(UTF8); } else if (type.equals(Schema.Type.RECORD)) { return new GroupType(repetition, fieldName, convertFields(schema.getFields())); } else if (type.equals(Schema.Type.ENUM)) { - return primitive(fieldName, BINARY, repetition, ENUM); + builder = Types.primitive(BINARY, repetition).as(ENUM); } else if (type.equals(Schema.Type.ARRAY)) { if (writeOldListStructure) { return ConversionPatterns.listType(repetition, fieldName, @@ -146,16 +154,36 @@ public class AvroSchemaConverter { // avro map key type is always string return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType); } else if (type.equals(Schema.Type.FIXED)) { - return primitive(fieldName, FIXED_LEN_BYTE_ARRAY, repetition, - schema.getFixedSize(), null); + builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition) + .length(schema.getFixedSize()); } else if (type.equals(Schema.Type.UNION)) { return convertUnion(fieldName, schema, repetition); + } else { + throw new UnsupportedOperationException("Cannot convert Avro type " + type); } - throw new UnsupportedOperationException("Cannot convert Avro type " + type); + + // schema translation can only be done for known logical types because this + // creates an equivalence + LogicalType logicalType = schema.getLogicalType(); + if (logicalType != null) { + if (logicalType instanceof LogicalTypes.Decimal) { + builder = builder.as(DECIMAL) + .precision(((LogicalTypes.Decimal) logicalType).getPrecision()) + .scale(((LogicalTypes.Decimal) logicalType).getScale()); + + } else { + OriginalType annotation = convertLogicalType(logicalType); + if (annotation != null) { + builder.as(annotation); + } + } + } + + return builder.named(fieldName); } private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) { - List<Schema> nonNullSchemas = new ArrayList(schema.getTypes().size()); + List<Schema> nonNullSchemas = new ArrayList<Schema>(schema.getTypes().size()); for (Schema childSchema : schema.getTypes()) { if (childSchema.getType().equals(Schema.Type.NULL)) { if (Type.Repetition.REQUIRED == repetition) { @@ -175,7 +203,7 @@ public class AvroSchemaConverter { return convertField(fieldName, nonNullSchemas.get(0), repetition); default: // complex union type - List<Type> unionTypes = new ArrayList(nonNullSchemas.size()); + List<Type> unionTypes = new ArrayList<Type>(nonNullSchemas.size()); int index = 0; for (Schema childSchema : nonNullSchemas) { unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL)); @@ -188,24 +216,6 @@ public class AvroSchemaConverter { return convertField(field.name(), field.schema()); } - private PrimitiveType primitive(String name, - PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition, - int typeLength, OriginalType originalType) { - return new PrimitiveType(repetition, primitive, typeLength, name, - originalType); - } - - private PrimitiveType primitive(String name, - PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition, - OriginalType originalType) { - return new PrimitiveType(repetition, primitive, name, originalType); - } - - private PrimitiveType primitive(String name, - PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition) { - return new PrimitiveType(repetition, primitive, name, null); - } - public Schema convert(MessageType parquetSchema) { return convertFields(parquetSchema.getName(), parquetSchema.getFields()); } @@ -217,10 +227,11 @@ public class AvroSchemaConverter { if (parquetType.isRepetition(REPEATED)) { throw new UnsupportedOperationException("REPEATED not supported outside LIST or MAP. Type: " + parquetType); } else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) { - fields.add(new Schema.Field(parquetType.getName(), optional(fieldSchema), null, - NullNode.getInstance())); + fields.add(new Schema.Field( + parquetType.getName(), optional(fieldSchema), null, NULL_VALUE)); } else { // REQUIRED - fields.add(new Schema.Field(parquetType.getName(), fieldSchema, null, null)); + fields.add(new Schema.Field( + parquetType.getName(), fieldSchema, null, (Object) null)); } } Schema schema = Schema.createRecord(name, null, null, false); @@ -230,10 +241,11 @@ public class AvroSchemaConverter { private Schema convertField(final Type parquetType) { if (parquetType.isPrimitive()) { + final PrimitiveType asPrimitive = parquetType.asPrimitiveType(); final PrimitiveTypeName parquetPrimitiveTypeName = - parquetType.asPrimitiveType().getPrimitiveTypeName(); - final OriginalType originalType = parquetType.getOriginalType(); - return parquetPrimitiveTypeName.convert( + asPrimitive.getPrimitiveTypeName(); + final OriginalType annotation = parquetType.getOriginalType(); + Schema schema = parquetPrimitiveTypeName.convert( new PrimitiveType.PrimitiveTypeNameConverter<Schema, RuntimeException>() { @Override public Schema convertBOOLEAN(PrimitiveTypeName primitiveTypeName) { @@ -266,13 +278,24 @@ public class AvroSchemaConverter { } @Override public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) { - if (originalType == OriginalType.UTF8 || originalType == OriginalType.ENUM) { + if (annotation == OriginalType.UTF8 || annotation == OriginalType.ENUM) { return Schema.create(Schema.Type.STRING); } else { return Schema.create(Schema.Type.BYTES); } } }); + + LogicalType logicalType = convertOriginalType( + annotation, asPrimitive.getDecimalMetadata()); + if (logicalType != null && (annotation != DECIMAL || + parquetPrimitiveTypeName == BINARY || + parquetPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY)) { + schema = logicalType.addToSchema(schema); + } + + return schema; + } else { GroupType parquetGroupType = parquetType.asGroupType(); OriginalType originalType = parquetGroupType.getOriginalType(); @@ -335,6 +358,46 @@ public class AvroSchemaConverter { } } + private OriginalType convertLogicalType(LogicalType logicalType) { + if (logicalType == null) { + return null; + } else if (logicalType instanceof LogicalTypes.Decimal) { + return OriginalType.DECIMAL; + } else if (logicalType instanceof LogicalTypes.Date) { + return OriginalType.DATE; + } else if (logicalType instanceof LogicalTypes.TimeMillis) { + return OriginalType.TIME_MILLIS; + } else if (logicalType instanceof LogicalTypes.TimeMicros) { + return OriginalType.TIME_MICROS; + } else if (logicalType instanceof LogicalTypes.TimestampMillis) { + return OriginalType.TIMESTAMP_MILLIS; + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + return OriginalType.TIMESTAMP_MICROS; + } + return null; + } + + private LogicalType convertOriginalType(OriginalType annotation, DecimalMetadata meta) { + if (annotation == null) { + return null; + } + switch (annotation) { + case DECIMAL: + return LogicalTypes.decimal(meta.getPrecision(), meta.getScale()); + case DATE: + return LogicalTypes.date(); + case TIME_MILLIS: + return LogicalTypes.timeMillis(); + case TIME_MICROS: + return LogicalTypes.timeMicros(); + case TIMESTAMP_MILLIS: + return LogicalTypes.timestampMillis(); + case TIMESTAMP_MICROS: + return LogicalTypes.timestampMicros(); + } + return null; + } + /** * Implements the rules for interpreting existing data from the logical type * spec for the LIST annotation. This is used to produce the expected schema. http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/36e14294/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java index 29dc9a1..460565b 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java @@ -23,6 +23,8 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.avro.Conversion; +import org.apache.avro.LogicalType; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericFixed; @@ -69,6 +71,8 @@ public class AvroWriteSupport<T> extends WriteSupport<T> { private RecordConsumer recordConsumer; private MessageType rootSchema; private Schema rootAvroSchema; + private LogicalType rootLogicalType; + private Conversion<?> rootConversion; private GenericData model; private ListWriter listWriter; @@ -82,6 +86,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> { public AvroWriteSupport(MessageType schema, Schema avroSchema) { this.rootSchema = schema; this.rootAvroSchema = avroSchema; + this.rootLogicalType = rootAvroSchema.getLogicalType(); this.model = null; } @@ -89,6 +94,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> { GenericData model) { this.rootSchema = schema; this.rootAvroSchema = avroSchema; + this.rootLogicalType = rootAvroSchema.getLogicalType(); this.model = model; } @@ -136,16 +142,25 @@ public class AvroWriteSupport<T> extends WriteSupport<T> { // overloaded version for backward compatibility @SuppressWarnings("unchecked") public void write(IndexedRecord record) { - recordConsumer.startMessage(); - writeRecordFields(rootSchema, rootAvroSchema, record); - recordConsumer.endMessage(); + write((T) record); } @Override public void write(T record) { - recordConsumer.startMessage(); - writeRecordFields(rootSchema, rootAvroSchema, record); - recordConsumer.endMessage(); + if (rootLogicalType != null) { + Conversion<?> conversion = model.getConversionByClass( + record.getClass(), rootLogicalType); + + recordConsumer.startMessage(); + writeRecordFields(rootSchema, rootAvroSchema, + convert(rootAvroSchema, rootLogicalType, conversion, record)); + recordConsumer.endMessage(); + + } else { + recordConsumer.startMessage(); + writeRecordFields(rootSchema, rootAvroSchema, record); + recordConsumer.endMessage(); + } } private void writeRecord(GroupType schema, Schema avroSchema, @@ -226,6 +241,8 @@ public class AvroWriteSupport<T> extends WriteSupport<T> { } } + // TODO: what if the value is null? + // Sparsely populated method of encoding unions, each member has its own // set of columns. String memberName = "member" + parquetIndex; @@ -237,44 +254,108 @@ public class AvroWriteSupport<T> extends WriteSupport<T> { recordConsumer.endGroup(); } - @SuppressWarnings("unchecked") + /** + * Calls an appropriate write method based on the value. + * Value MUST not be null. + * + * @param type the Parquet type + * @param avroSchema the Avro schema + * @param value a non-null value to write + */ private void writeValue(Type type, Schema avroSchema, Object value) { Schema nonNullAvroSchema = AvroSchemaConverter.getNonNull(avroSchema); - Schema.Type avroType = nonNullAvroSchema.getType(); - if (avroType.equals(Schema.Type.BOOLEAN)) { - recordConsumer.addBoolean((Boolean) value); - } else if (avroType.equals(Schema.Type.INT)) { - if (value instanceof Character) { - recordConsumer.addInteger((Character) value); - } else { - recordConsumer.addInteger(((Number) value).intValue()); - } - } else if (avroType.equals(Schema.Type.LONG)) { - recordConsumer.addLong(((Number) value).longValue()); - } else if (avroType.equals(Schema.Type.FLOAT)) { - recordConsumer.addFloat(((Number) value).floatValue()); - } else if (avroType.equals(Schema.Type.DOUBLE)) { - recordConsumer.addDouble(((Number) value).doubleValue()); - } else if (avroType.equals(Schema.Type.BYTES)) { - if (value instanceof byte[]) { - recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) value)); - } else { - recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value)); - } - } else if (avroType.equals(Schema.Type.STRING)) { - recordConsumer.addBinary(fromAvroString(value)); - } else if (avroType.equals(Schema.Type.RECORD)) { - writeRecord(type.asGroupType(), nonNullAvroSchema, value); - } else if (avroType.equals(Schema.Type.ENUM)) { - recordConsumer.addBinary(Binary.fromString(value.toString())); - } else if (avroType.equals(Schema.Type.ARRAY)) { - listWriter.writeList(type.asGroupType(), nonNullAvroSchema, value); - } else if (avroType.equals(Schema.Type.MAP)) { - writeMap(type.asGroupType(), nonNullAvroSchema, (Map<CharSequence, ?>) value); - } else if (avroType.equals(Schema.Type.UNION)) { - writeUnion(type.asGroupType(), nonNullAvroSchema, value); - } else if (avroType.equals(Schema.Type.FIXED)) { - recordConsumer.addBinary(Binary.fromReusedByteArray(((GenericFixed) value).bytes())); + LogicalType logicalType = nonNullAvroSchema.getLogicalType(); + if (logicalType != null) { + Conversion<?> conversion = model.getConversionByClass( + value.getClass(), logicalType); + writeValueWithoutConversion(type, nonNullAvroSchema, + convert(nonNullAvroSchema, logicalType, conversion, value)); + } else { + writeValueWithoutConversion(type, nonNullAvroSchema, value); + } + } + + private <D> Object convert(Schema schema, LogicalType logicalType, + Conversion<D> conversion, Object datum) { + if (conversion == null) { + return datum; + } + Class<D> fromClass = conversion.getConvertedType(); + switch (schema.getType()) { + case RECORD: return conversion.toRecord(fromClass.cast(datum), schema, logicalType); + case ENUM: return conversion.toEnumSymbol(fromClass.cast(datum), schema, logicalType); + case ARRAY: return conversion.toArray(fromClass.cast(datum), schema, logicalType); + case MAP: return conversion.toMap(fromClass.cast(datum), schema, logicalType); + case FIXED: return conversion.toFixed(fromClass.cast(datum), schema, logicalType); + case STRING: return conversion.toCharSequence(fromClass.cast(datum), schema, logicalType); + case BYTES: return conversion.toBytes(fromClass.cast(datum), schema, logicalType); + case INT: return conversion.toInt(fromClass.cast(datum), schema, logicalType); + case LONG: return conversion.toLong(fromClass.cast(datum), schema, logicalType); + case FLOAT: return conversion.toFloat(fromClass.cast(datum), schema, logicalType); + case DOUBLE: return conversion.toDouble(fromClass.cast(datum), schema, logicalType); + case BOOLEAN: return conversion.toBoolean(fromClass.cast(datum), schema, logicalType); + } + return datum; + } + + /** + * Calls an appropriate write method based on the value. + * Value must not be null and the schema must not be nullable. + * + * @param type a Parquet type + * @param avroSchema a non-nullable Avro schema + * @param value a non-null value to write + */ + @SuppressWarnings("unchecked") + private void writeValueWithoutConversion(Type type, Schema avroSchema, Object value) { + switch (avroSchema.getType()) { + case BOOLEAN: + recordConsumer.addBoolean((Boolean) value); + break; + case INT: + if (value instanceof Character) { + recordConsumer.addInteger((Character) value); + } else { + recordConsumer.addInteger(((Number) value).intValue()); + } + break; + case LONG: + recordConsumer.addLong(((Number) value).longValue()); + break; + case FLOAT: + recordConsumer.addFloat(((Number) value).floatValue()); + break; + case DOUBLE: + recordConsumer.addDouble(((Number) value).doubleValue()); + break; + case FIXED: + recordConsumer.addBinary(Binary.fromReusedByteArray(((GenericFixed) value).bytes())); + break; + case BYTES: + if (value instanceof byte[]) { + recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) value)); + } else { + recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value)); + } + break; + case STRING: + recordConsumer.addBinary(fromAvroString(value)); + break; + case RECORD: + writeRecord(type.asGroupType(), avroSchema, value); + break; + case ENUM: + recordConsumer.addBinary(Binary.fromString(value.toString())); + break; + case ARRAY: + listWriter.writeList(type.asGroupType(), avroSchema, value); + break; + case MAP: + writeMap(type.asGroupType(), avroSchema, (Map<CharSequence, ?>) value); + break; + case UNION: + writeUnion(type.asGroupType(), avroSchema, value); + break; } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/36e14294/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java b/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java index 67b710d..f36f5fc 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java @@ -18,6 +18,16 @@ */ package org.apache.parquet.avro; +import org.apache.avro.Conversion; +import org.apache.avro.LogicalType; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericEnumSymbol; +import org.apache.avro.generic.IndexedRecord; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Map; + abstract class ParentValueContainer { /** @@ -60,4 +70,169 @@ abstract class ParentValueContainer { add(value); } + static class LogicalTypePrimitiveContainer extends ParentValueContainer { + private final ParentValueContainer wrapped; + private final Schema schema; + private final LogicalType logicalType; + private final Conversion conversion; + + public LogicalTypePrimitiveContainer(ParentValueContainer wrapped, + Schema schema, Conversion conversion) { + this.wrapped = wrapped; + this.schema = schema; + this.logicalType = schema.getLogicalType(); + this.conversion = conversion; + } + + @Override + public void addDouble(double value) { + wrapped.add(conversion.fromDouble(value, schema, logicalType)); + } + + @Override + public void addFloat(float value) { + wrapped.add(conversion.fromFloat(value, schema, logicalType)); + } + + @Override + public void addLong(long value) { + wrapped.add(conversion.fromLong(value, schema, logicalType)); + } + + @Override + public void addInt(int value) { + wrapped.add(conversion.fromInt(value, schema, logicalType)); + } + + @Override + public void addShort(short value) { + wrapped.add(conversion.fromInt((int) value, schema, logicalType)); + } + + @Override + public void addChar(char value) { + wrapped.add(conversion.fromInt((int) value, schema, logicalType)); + } + + @Override + public void addByte(byte value) { + wrapped.add(conversion.fromInt((int) value, schema, logicalType)); + } + + @Override + public void addBoolean(boolean value) { + wrapped.add(conversion.fromBoolean(value, schema, logicalType)); + } + } + + static ParentValueContainer getConversionContainer( + final ParentValueContainer parent, final Conversion<?> conversion, + final Schema schema) { + if (conversion == null) { + return parent; + } + + final LogicalType logicalType = schema.getLogicalType(); + + switch (schema.getType()) { + case STRING: + return new ParentValueContainer() { + @Override + public void add(Object value) { + parent.add(conversion.fromCharSequence( + (CharSequence) value, schema, logicalType)); + } + }; + case BOOLEAN: + return new LogicalTypePrimitiveContainer(parent, schema, conversion) { + @Override + public void add(Object value) { + parent.add(conversion.fromBoolean( + (Boolean) value, schema, logicalType)); + } + }; + case INT: + return new LogicalTypePrimitiveContainer(parent, schema, conversion) { + @Override + public void add(Object value) { + parent.add(conversion.fromInt( + (Integer) value, schema, logicalType)); + } + }; + case LONG: + return new LogicalTypePrimitiveContainer(parent, schema, conversion) { + @Override + public void add(Object value) { + parent.add(conversion.fromLong( + (Long) value, schema, logicalType)); + } + }; + case FLOAT: + return new LogicalTypePrimitiveContainer(parent, schema, conversion) { + @Override + public void add(Object value) { + parent.add(conversion.fromFloat( + (Float) value, schema, logicalType)); + } + }; + case DOUBLE: + return new LogicalTypePrimitiveContainer(parent, schema, conversion) { + @Override + public void add(Object value) { + parent.add(conversion.fromDouble( + (Double) value, schema, logicalType)); + } + }; + case BYTES: + return new ParentValueContainer() { + @Override + public void add(Object value) { + parent.add(conversion.fromBytes( + (ByteBuffer) value, schema, logicalType)); + } + }; + case FIXED: + return new ParentValueContainer() { + @Override + public void add(Object value) { + parent.add(conversion.fromFixed( + (GenericData.Fixed) value, schema, logicalType)); + } + }; + case RECORD: + return new ParentValueContainer() { + @Override + public void add(Object value) { + parent.add(conversion.fromRecord( + (IndexedRecord) value, schema, logicalType)); + } + }; + case ARRAY: + return new ParentValueContainer() { + @Override + public void add(Object value) { + parent.add(conversion.fromArray( + (Collection<?>) value, schema, logicalType)); + } + }; + case MAP: + return new ParentValueContainer() { + @Override + public void add(Object value) { + parent.add(conversion.fromMap( + (Map<?, ?>) value, schema, logicalType)); + } + }; + case ENUM: + return new ParentValueContainer() { + @Override + public void add(Object value) { + parent.add(conversion.fromEnumSymbol( + (GenericEnumSymbol) value, schema, logicalType)); + } + }; + default: + return new LogicalTypePrimitiveContainer(parent, schema, conversion); + } + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/36e14294/parquet-avro/src/main/resources/META-INF/LICENSE ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/resources/META-INF/LICENSE b/parquet-avro/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000..20b23c9 --- /dev/null +++ b/parquet-avro/src/main/resources/META-INF/LICENSE @@ -0,0 +1,186 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + +-------------------------------------------------------------------------------- + +This product includes code from Apache Avro. + +Copyright: 2014 The Apache Software Foundation. +Home page: https://avro.apache.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 + http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/36e14294/parquet-avro/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/resources/META-INF/NOTICE b/parquet-avro/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..7b5682c --- /dev/null +++ b/parquet-avro/src/main/resources/META-INF/NOTICE @@ -0,0 +1,18 @@ + +Apache Parquet MR (Incubating) +Copyright 2014-2015 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +-------------------------------------------------------------------------------- + +This product includes code from Apache Avro, which includes the following in +its NOTICE file: + + Apache Avro + Copyright 2010-2015 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/36e14294/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java index d5fe11a..f4682d6 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java @@ -19,11 +19,21 @@ package org.apache.parquet.avro; import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; import org.codehaus.jackson.node.NullNode; +import org.junit.Assert; +import org.junit.rules.TemporaryFolder; public class AvroTestUtil { @@ -66,4 +76,47 @@ public class AvroTestUtil { return record; } + public static <D> List<D> read(GenericData model, Schema schema, File file) throws IOException { + List<D> data = new ArrayList<D>(); + Configuration conf = new Configuration(false); + AvroReadSupport.setRequestedProjection(conf, schema); + AvroReadSupport.setAvroReadSchema(conf, schema); + ParquetReader<D> fileReader = AvroParquetReader + .<D>builder(new Path(file.toString())) + .withDataModel(model) // reflect disables compatibility + .withConf(conf) + .build(); + + try { + D datum; + while ((datum = fileReader.read()) != null) { + data.add(datum); + } + } finally { + fileReader.close(); + } + + return data; + } + + @SuppressWarnings("unchecked") + public static <D> File write(TemporaryFolder temp, GenericData model, Schema schema, D... data) throws IOException { + File file = temp.newFile(); + Assert.assertTrue(file.delete()); + ParquetWriter<D> writer = AvroParquetWriter + .<D>builder(new Path(file.toString())) + .withDataModel(model) + .withSchema(schema) + .build(); + + try { + for (D datum : data) { + writer.write(datum); + } + } finally { + writer.close(); + } + + return file; + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/36e14294/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java index b393615..942e3b1 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java @@ -20,16 +20,37 @@ package org.apache.parquet.avro; import com.google.common.collect.Lists; import com.google.common.io.Resources; -import java.util.Arrays; -import java.util.Collections; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; import org.codehaus.jackson.node.NullNode; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; +import java.util.Arrays; +import java.util.Collections; +import static org.apache.avro.Schema.Type.INT; +import static org.apache.avro.Schema.Type.LONG; +import static org.apache.parquet.schema.OriginalType.DATE; +import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS; +import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS; +import static org.apache.parquet.schema.OriginalType.TIME_MICROS; +import static org.apache.parquet.schema.OriginalType.TIME_MILLIS; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.junit.Assert.assertEquals; public class TestAvroSchemaConverter { @@ -131,7 +152,7 @@ public class TestAvroSchemaConverter { @Test(expected = IllegalArgumentException.class) public void testTopLevelMustBeARecord() { - new AvroSchemaConverter().convert(Schema.create(Schema.Type.INT)); + new AvroSchemaConverter().convert(Schema.create(INT)); } @Test @@ -270,7 +291,7 @@ public class TestAvroSchemaConverter { @Test public void testOptionalFields() throws Exception { Schema schema = Schema.createRecord("record1", null, null, false); - Schema optionalInt = optional(Schema.create(Schema.Type.INT)); + Schema optionalInt = optional(Schema.create(INT)); schema.setFields(Arrays.asList( new Schema.Field("myint", optionalInt, null, NullNode.getInstance()) )); @@ -284,7 +305,7 @@ public class TestAvroSchemaConverter { @Test public void testOptionalMapValue() throws Exception { Schema schema = Schema.createRecord("record1", null, null, false); - Schema optionalIntMap = Schema.createMap(optional(Schema.create(Schema.Type.INT))); + Schema optionalIntMap = Schema.createMap(optional(Schema.create(INT))); schema.setFields(Arrays.asList( new Schema.Field("myintmap", optionalIntMap, null, null) )); @@ -303,7 +324,7 @@ public class TestAvroSchemaConverter { @Test public void testOptionalArrayElement() throws Exception { Schema schema = Schema.createRecord("record1", null, null, false); - Schema optionalIntArray = Schema.createArray(optional(Schema.create(Schema.Type.INT))); + Schema optionalIntArray = Schema.createArray(optional(Schema.create(INT))); schema.setFields(Arrays.asList( new Schema.Field("myintarray", optionalIntArray, null, null) )); @@ -323,7 +344,7 @@ public class TestAvroSchemaConverter { Schema schema = Schema.createRecord("record2", null, null, false); Schema multipleTypes = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type .NULL), - Schema.create(Schema.Type.INT), + Schema.create(INT), Schema.create(Schema.Type.FLOAT))); schema.setFields(Arrays.asList( new Schema.Field("myunion", multipleTypes, null, NullNode.getInstance()))); @@ -396,7 +417,7 @@ public class TestAvroSchemaConverter { @Test public void testOldAvroListOfLists() throws Exception { Schema listOfLists = optional(Schema.createArray(Schema.createArray( - Schema.create(Schema.Type.INT)))); + Schema.create(INT)))); Schema schema = Schema.createRecord("AvroCompatListInList", null, null, false); schema.setFields(Lists.newArrayList( new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance()) @@ -425,7 +446,7 @@ public class TestAvroSchemaConverter { @Test public void testOldThriftListOfLists() throws Exception { Schema listOfLists = optional(Schema.createArray(Schema.createArray( - Schema.create(Schema.Type.INT)))); + Schema.create(INT)))); Schema schema = Schema.createRecord("ThriftCompatListInList", null, null, false); schema.setFields(Lists.newArrayList( new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance()) @@ -458,7 +479,7 @@ public class TestAvroSchemaConverter { // group's name, but it must be 2-level because the repeated group doesn't // contain an optional or repeated element as required for 3-level lists Schema listOfLists = optional(Schema.createArray(Schema.createArray( - Schema.create(Schema.Type.INT)))); + Schema.create(INT)))); Schema schema = Schema.createRecord("UnknownTwoLevelListInList", null, null, false); schema.setFields(Lists.newArrayList( new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance()) @@ -488,7 +509,7 @@ public class TestAvroSchemaConverter { @Test public void testParquetMapWithoutMapKeyValueAnnotation() throws Exception { Schema schema = Schema.createRecord("myrecord", null, null, false); - Schema map = Schema.createMap(Schema.create(Schema.Type.INT)); + Schema map = Schema.createMap(Schema.create(INT)); schema.setFields(Collections.singletonList(new Schema.Field("mymap", map, null, null))); String parquetSchema = "message myrecord {\n" + @@ -504,9 +525,240 @@ public class TestAvroSchemaConverter { testParquetToAvroConversion(NEW_BEHAVIOR, schema, parquetSchema); } + @Test + public void testDecimalBytesType() throws Exception { + Schema schema = Schema.createRecord("myrecord", null, null, false); + Schema decimal = LogicalTypes.decimal(9, 2).addToSchema( + Schema.create(Schema.Type.BYTES)); + schema.setFields(Collections.singletonList( + new Schema.Field("dec", decimal, null, null))); + + testRoundTripConversion(schema, + "message myrecord {\n" + + " required binary dec (DECIMAL(9,2));\n" + + "}\n"); + } + + @Test + public void testDecimalFixedType() throws Exception { + Schema schema = Schema.createRecord("myrecord", null, null, false); + Schema decimal = LogicalTypes.decimal(9, 2).addToSchema( + Schema.createFixed("dec", null, null, 8)); + schema.setFields(Collections.singletonList( + new Schema.Field("dec", decimal, null, null))); + + testRoundTripConversion(schema, + "message myrecord {\n" + + " required fixed_len_byte_array(8) dec (DECIMAL(9,2));\n" + + "}\n"); + } + + @Test + public void testDecimalIntegerType() throws Exception { + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field( + "dec", Schema.create(INT), null, null))); + + // the decimal portion is lost because it isn't valid in Avro + testParquetToAvroConversion(expected, + "message myrecord {\n" + + " required int32 dec (DECIMAL(9,2));\n" + + "}\n"); + } + + @Test + public void testDecimalLongType() throws Exception { + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("dec", Schema.create(LONG), null, null))); + + // the decimal portion is lost because it isn't valid in Avro + testParquetToAvroConversion(expected, + "message myrecord {\n" + + " required int64 dec (DECIMAL(9,2));\n" + + "}\n"); + } + + @Test + public void testDateType() throws Exception { + Schema date = LogicalTypes.date().addToSchema(Schema.create(INT)); + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("date", date, null, null))); + + testRoundTripConversion(expected, + "message myrecord {\n" + + " required int32 date (DATE);\n" + + "}\n"); + + for (PrimitiveTypeName primitive : new PrimitiveTypeName[] + {INT64, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) { + final PrimitiveType type; + if (primitive == FIXED_LEN_BYTE_ARRAY) { + type = new PrimitiveType(REQUIRED, primitive, 12, "test", DATE); + } else { + type = new PrimitiveType(REQUIRED, primitive, "test", DATE); + } + + assertThrows("Should not allow TIME_MICROS with " + primitive, + IllegalArgumentException.class, new Runnable() { + @Override + public void run() { + new AvroSchemaConverter().convert(message(type)); + } + }); + } + } + + @Test + public void testTimeMillisType() throws Exception { + Schema date = LogicalTypes.timeMillis().addToSchema(Schema.create(INT)); + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("time", date, null, null))); + + testRoundTripConversion(expected, + "message myrecord {\n" + + " required int32 time (TIME_MILLIS);\n" + + "}\n"); + + for (PrimitiveTypeName primitive : new PrimitiveTypeName[] + {INT64, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) { + final PrimitiveType type; + if (primitive == FIXED_LEN_BYTE_ARRAY) { + type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIME_MILLIS); + } else { + type = new PrimitiveType(REQUIRED, primitive, "test", TIME_MILLIS); + } + + assertThrows("Should not allow TIME_MICROS with " + primitive, + IllegalArgumentException.class, new Runnable() { + @Override + public void run() { + new AvroSchemaConverter().convert(message(type)); + } + }); + } + } + + @Test + public void testTimeMicrosType() throws Exception { + Schema date = LogicalTypes.timeMicros().addToSchema(Schema.create(LONG)); + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("time", date, null, null))); + + testRoundTripConversion(expected, + "message myrecord {\n" + + " required int64 time (TIME_MICROS);\n" + + "}\n"); + + for (PrimitiveTypeName primitive : new PrimitiveTypeName[] + {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) { + final PrimitiveType type; + if (primitive == FIXED_LEN_BYTE_ARRAY) { + type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIME_MICROS); + } else { + type = new PrimitiveType(REQUIRED, primitive, "test", TIME_MICROS); + } + + assertThrows("Should not allow TIME_MICROS with " + primitive, + IllegalArgumentException.class, new Runnable() { + @Override + public void run() { + new AvroSchemaConverter().convert(message(type)); + } + }); + } + } + + @Test + public void testTimestampMillisType() throws Exception { + Schema date = LogicalTypes.timestampMillis().addToSchema(Schema.create(LONG)); + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("timestamp", date, null, null))); + + testRoundTripConversion(expected, + "message myrecord {\n" + + " required int64 timestamp (TIMESTAMP_MILLIS);\n" + + "}\n"); + + for (PrimitiveTypeName primitive : new PrimitiveTypeName[] + {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) { + final PrimitiveType type; + if (primitive == FIXED_LEN_BYTE_ARRAY) { + type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MILLIS); + } else { + type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MILLIS); + } + + assertThrows("Should not allow TIMESTAMP_MILLIS with " + primitive, + IllegalArgumentException.class, new Runnable() { + @Override + public void run() { + new AvroSchemaConverter().convert(message(type)); + } + }); + } + } + + @Test + public void testTimestampMicrosType() throws Exception { + Schema date = LogicalTypes.timestampMicros().addToSchema(Schema.create(LONG)); + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("timestamp", date, null, null))); + + testRoundTripConversion(expected, + "message myrecord {\n" + + " required int64 timestamp (TIMESTAMP_MICROS);\n" + + "}\n"); + + for (PrimitiveTypeName primitive : new PrimitiveTypeName[] + {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) { + final PrimitiveType type; + if (primitive == FIXED_LEN_BYTE_ARRAY) { + type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MICROS); + } else { + type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MICROS); + } + + assertThrows("Should not allow TIMESTAMP_MICROS with " + primitive, + IllegalArgumentException.class, new Runnable() { + @Override + public void run() { + new AvroSchemaConverter().convert(message(type)); + } + }); + } + } + public static Schema optional(Schema original) { return Schema.createUnion(Lists.newArrayList( Schema.create(Schema.Type.NULL), original)); } + + public static MessageType message(PrimitiveType primitive) { + return Types.buildMessage() + .addField(primitive) + .named("myrecord"); + } + + /** + * A convenience method to avoid a large number of @Test(expected=...) tests + * @param message A String message to describe this assertion + * @param expected An Exception class that the Runnable should throw + * @param runnable A Runnable that is expected to throw the exception + */ + public static void assertThrows( + String message, Class<? extends Exception> expected, Runnable runnable) { + try { + runnable.run(); + Assert.fail("No exception was thrown (" + message + "), expected: " + + expected.getName()); + } catch (Exception actual) { + try { + Assert.assertEquals(message, expected, actual.getClass()); + } catch (AssertionError e) { + e.addSuppressed(actual); + throw e; + } + } + } }