Repository: parquet-mr Updated Branches: refs/heads/master 0ed977ab4 -> cf991604d
PARQUET-755: create parquet-arrow module with schema converter Author: Julien Le Dem <[email protected]> Closes #381 from julienledem/parquet_arrow and squashes the following commits: 9792683 [Julien Le Dem] PARQUET-755: create parquet-arrow module with schema converter introduces SchemaMapping add repeated mapping Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/cf991604 Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/cf991604 Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/cf991604 Branch: refs/heads/master Commit: cf991604d75d446d02baddc536c7c05b43cd8dea Parents: 0ed977a Author: Julien Le Dem <[email protected]> Authored: Wed Nov 9 08:58:59 2016 -0800 Committer: Julien Le Dem <[email protected]> Committed: Wed Nov 9 08:58:59 2016 -0800 ---------------------------------------------------------------------- parquet-arrow/pom.xml | 96 +++ .../parquet/arrow/schema/List3Levels.java | 77 +++ .../parquet/arrow/schema/SchemaConverter.java | 642 +++++++++++++++++++ .../parquet/arrow/schema/SchemaMapping.java | 203 ++++++ .../arrow/schema/TestSchemaConverter.java | 343 ++++++++++ .../java/org/apache/parquet/schema/Types.java | 9 +- pom.xml | 1 + 7 files changed, 1367 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/cf991604/parquet-arrow/pom.xml ---------------------------------------------------------------------- diff --git a/parquet-arrow/pom.xml b/parquet-arrow/pom.xml new file mode 100644 index 0000000..96981f8 --- /dev/null +++ b/parquet-arrow/pom.xml @@ -0,0 +1,96 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet</artifactId> + <relativePath>../pom.xml</relativePath> + <version>1.9.1-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>parquet-arrow</artifactId> + <packaging>jar</packaging> + + <name>Apache Parquet Arrow</name> + <url>https://parquet.apache.org</url> + + <properties> + <arrow.version>0.1.0</arrow.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-format</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-encoding</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-column</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-column</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>tests</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <version>${slf4j.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/cf991604/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/List3Levels.java ---------------------------------------------------------------------- diff --git a/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/List3Levels.java b/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/List3Levels.java new file mode 100644 index 0000000..cf21cb1 --- /dev/null +++ b/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/List3Levels.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.arrow.schema; + +import static org.apache.parquet.schema.Type.Repetition.REPEATED; + +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.Type; + +/** + * Represents a standard 3 levels Parquet list + * (can be null, can contain nulls) + * - optional list + * - repeated content + * - optional element + */ +class List3Levels { + private final GroupType list; + private final GroupType repeated; + private final Type element; + + /** + * Will validate the structure of the list + * @param list the Parquet List + */ + public List3Levels(GroupType list) { + if (list.getOriginalType() != OriginalType.LIST || list.getFields().size() != 1) { + throw new IllegalArgumentException("invalid list type: " + list); + } + this.list = list; + Type repeatedField = list.getFields().get(0); + if (repeatedField.isPrimitive() || !repeatedField.isRepetition(REPEATED) || repeatedField.asGroupType().getFields().size() != 1) { + throw new IllegalArgumentException("invalid list type: " + list); + } + this.repeated = repeatedField.asGroupType(); + this.element = repeated.getFields().get(0); + } + + /** + * @return the root list element (an optional group with one child) + */ + public GroupType getList() { + return list; + } + + /** + * @return repeated level, single child of list + */ + public GroupType getRepeated() { + return repeated; + } + + /** + * @return the element level, single child of repeated. + */ + public Type getElement() { + return element; + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/cf991604/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaConverter.java ---------------------------------------------------------------------- diff --git a/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaConverter.java b/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaConverter.java new file mode 100644 index 0000000..773f7c8 --- /dev/null +++ b/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaConverter.java @@ -0,0 +1,642 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.arrow.schema; + +import static java.util.Arrays.asList; +import static org.apache.parquet.schema.OriginalType.DATE; +import static org.apache.parquet.schema.OriginalType.DECIMAL; +import static org.apache.parquet.schema.OriginalType.INTERVAL; +import static org.apache.parquet.schema.OriginalType.INT_16; +import static org.apache.parquet.schema.OriginalType.INT_32; +import static org.apache.parquet.schema.OriginalType.INT_64; +import static org.apache.parquet.schema.OriginalType.INT_8; +import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS; +import static org.apache.parquet.schema.OriginalType.TIME_MILLIS; +import static org.apache.parquet.schema.OriginalType.UINT_16; +import static org.apache.parquet.schema.OriginalType.UINT_32; +import static org.apache.parquet.schema.OriginalType.UINT_64; +import static org.apache.parquet.schema.OriginalType.UINT_8; +import static org.apache.parquet.schema.OriginalType.UTF8; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; +import static org.apache.parquet.schema.Type.Repetition.REPEATED; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.arrow.flatbuf.Precision; +import org.apache.arrow.flatbuf.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeVisitor; +import org.apache.arrow.vector.types.pojo.ArrowType.Binary; +import org.apache.arrow.vector.types.pojo.ArrowType.Bool; +import org.apache.arrow.vector.types.pojo.ArrowType.Date; +import org.apache.arrow.vector.types.pojo.ArrowType.Decimal; +import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint; +import org.apache.arrow.vector.types.pojo.ArrowType.Int; +import org.apache.arrow.vector.types.pojo.ArrowType.Interval; +import org.apache.arrow.vector.types.pojo.ArrowType.Null; +import org.apache.arrow.vector.types.pojo.ArrowType.Struct_; +import org.apache.arrow.vector.types.pojo.ArrowType.Time; +import org.apache.arrow.vector.types.pojo.ArrowType.Timestamp; +import org.apache.arrow.vector.types.pojo.ArrowType.Union; +import org.apache.arrow.vector.types.pojo.ArrowType.Utf8; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.parquet.arrow.schema.SchemaMapping.ListTypeMapping; +import org.apache.parquet.arrow.schema.SchemaMapping.PrimitiveTypeMapping; +import org.apache.parquet.arrow.schema.SchemaMapping.RepeatedTypeMapping; +import org.apache.parquet.arrow.schema.SchemaMapping.StructTypeMapping; +import org.apache.parquet.arrow.schema.SchemaMapping.TypeMapping; +import org.apache.parquet.arrow.schema.SchemaMapping.UnionTypeMapping; +import org.apache.parquet.schema.DecimalMetadata; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.Types.GroupBuilder; + +/** + * Logic to convert Parquet and Arrow Schemas back and forth and maintain the mapping + */ +public class SchemaConverter { + + /** + * For when we'll need this to be configurable + */ + public SchemaConverter() { + } + + /** + * Creates a Parquet Schema from an Arrow one and returns the mapping + * @param arrowSchema the provided Arrow Schema + * @return the mapping between the 2 + */ + public SchemaMapping fromArrow(Schema arrowSchema) { + List<Field> fields = arrowSchema.getFields(); + List<TypeMapping> parquetFields = fromArrow(fields); + MessageType parquetType = addToBuilder(parquetFields, Types.buildMessage()).named("root"); + return new SchemaMapping(arrowSchema, parquetType, parquetFields); + } + + private <T> GroupBuilder<T> addToBuilder(List<TypeMapping> parquetFields, GroupBuilder<T> builder) { + for (TypeMapping type : parquetFields) { + builder = builder.addField(type.getParquetType()); + } + return builder; + } + + private List<TypeMapping> fromArrow(List<Field> fields) { + List<TypeMapping> result = new ArrayList<>(fields.size()); + for (Field field : fields) { + result.add(fromArrow(field)); + } + return result; + } + + private TypeMapping fromArrow(final Field field) { + return fromArrow(field, field.getName()); + } + + /** + * @param field arrow field + * @param fieldName overrides field.getName() + * @return mapping + */ + private TypeMapping fromArrow(final Field field, final String fieldName) { + final List<Field> children = field.getChildren(); + return field.getType().accept(new ArrowTypeVisitor<TypeMapping>() { + + @Override + public TypeMapping visit(Null type) { + // TODO(PARQUET-757): null original type + return primitive(BINARY); + } + + @Override + public TypeMapping visit(Struct_ type) { + List<TypeMapping> parquetTypes = fromArrow(children); + return new StructTypeMapping(field, addToBuilder(parquetTypes, Types.buildGroup(OPTIONAL)).named(fieldName), parquetTypes); + } + + @Override + public TypeMapping visit(org.apache.arrow.vector.types.pojo.ArrowType.List type) { + if (children.size() != 1) { + throw new IllegalArgumentException("list fields must have exactly one child: " + field); + } + TypeMapping parquetChild = fromArrow(children.get(0), "element"); + GroupType list = Types.optionalList().element(parquetChild.getParquetType()).named(fieldName); + return new ListTypeMapping(field, new List3Levels(list), parquetChild); + } + + @Override + public TypeMapping visit(Union type) { + // TODO(PARQUET-756): add Union OriginalType + List<TypeMapping> parquetTypes = fromArrow(children); + return new UnionTypeMapping(field, addToBuilder(parquetTypes, Types.buildGroup(OPTIONAL)).named(fieldName), parquetTypes); + } + + @Override + public TypeMapping visit(Int type) { + boolean signed = type.getIsSigned(); + switch (type.getBitWidth()) { + case 8: + return primitive(INT32, signed ? INT_8 : UINT_8); + case 16: + return primitive(INT32, signed ? INT_16 : UINT_16); + case 32: + return primitive(INT32, signed ? INT_32 : UINT_32); + case 64: + return primitive(INT64, signed ? INT_64 : UINT_64); + default: + throw new IllegalArgumentException("Illegal int type: " + field); + } + } + + @Override + public TypeMapping visit(FloatingPoint type) { + switch (type.getPrecision()) { + case Precision.HALF: + // TODO(PARQUET-757): original type HalfFloat + return primitive(FLOAT); + case Precision.SINGLE: + return primitive(FLOAT); + case Precision.DOUBLE: + return primitive(DOUBLE); + default: + throw new IllegalArgumentException("Illegal float type: " + field); + } + } + + @Override + public TypeMapping visit(Utf8 type) { + return primitive(BINARY, UTF8); + } + + @Override + public TypeMapping visit(Binary type) { + return primitive(BINARY); + } + + @Override + public TypeMapping visit(Bool type) { + return primitive(BOOLEAN); + } + + /** + * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal + * @param type + * @return + */ + @Override + public TypeMapping visit(Decimal type) { + int precision = type.getPrecision(); + int scale = type.getScale(); + if (1 <= precision && precision <= 9) { + return decimal(INT32, precision, scale); + } else if (1 <= precision && precision <= 18) { + return decimal(INT64, precision, scale); + } else { + // Better: FIXED_LENGTH_BYTE_ARRAY with length + return decimal(BINARY, precision, scale); + } + } + + @Override + public TypeMapping visit(Date type) { + return primitive(INT32, DATE); + } + + @Override + public TypeMapping visit(Time type) { + return primitive(INT32, TIME_MILLIS); + } + + @Override + public TypeMapping visit(Timestamp type) { + return primitive(INT64, TIMESTAMP_MILLIS); + } + + /** + * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval + */ + @Override + public TypeMapping visit(Interval type) { + // TODO(PARQUET-675): fix interval original types + return primitiveFLBA(12, INTERVAL); + } + + private TypeMapping mapping(PrimitiveType parquetType) { + return new PrimitiveTypeMapping(field, parquetType); + } + + private TypeMapping decimal(PrimitiveTypeName type, int precision, int scale) { + return mapping(Types.optional(type).as(DECIMAL).precision(precision).scale(scale).named(fieldName)); + } + + private TypeMapping primitive(PrimitiveTypeName type) { + return mapping(Types.optional(type).named(fieldName)); + } + + private TypeMapping primitive(PrimitiveTypeName type, OriginalType otype) { + return mapping(Types.optional(type).as(otype).named(fieldName)); + } + + private TypeMapping primitiveFLBA(int length, OriginalType otype) { + return mapping(Types.optional(FIXED_LEN_BYTE_ARRAY).length(length).as(otype).named(fieldName)); + } + }); + } + + /** + * Creates an Arrow Schema from an Parquet one and returns the mapping + * @param parquetSchema the provided Parquet Schema + * @return the mapping between the 2 + */ + public SchemaMapping fromParquet(MessageType parquetSchema) { + List<Type> fields = parquetSchema.getFields(); + List<TypeMapping> mappings = fromParquet(fields); + List<Field> arrowFields = fields(mappings); + return new SchemaMapping(new Schema(arrowFields), parquetSchema, mappings); + } + + private List<Field> fields(List<TypeMapping> mappings) { + List<Field> result = new ArrayList<>(mappings.size()); + for (TypeMapping typeMapping : mappings) { + result.add(typeMapping.getArrowField()); + } + return result; + } + + private List<TypeMapping> fromParquet(List<Type> fields) { + List<TypeMapping> result = new ArrayList<>(fields.size()); + for (Type type : fields) { + result.add(fromParquet(type)); + } + return result; + } + + private TypeMapping fromParquet(Type type) { + return fromParquet(type, type.getName(), type.getRepetition()); + } + + /** + * @param type parquet type + * @param name overrides parquet.getName) + * @param repetition overrides parquet.getRepetition() + * @return + */ + private TypeMapping fromParquet(Type type, String name, Repetition repetition) { + if (repetition == REPEATED) { + // case where we have a repeated field that is not in a List/Map + TypeMapping child = fromParquet(type, null, REQUIRED); + Field arrowField = new Field(name, false, new ArrowType.List(), asList(child.getArrowField())); + return new RepeatedTypeMapping(arrowField, type, child); + } + if (type.isPrimitive()) { + return fromParquetPrimitive(type.asPrimitiveType(), name); + } else { + return fromParquetGroup(type.asGroupType(), name); + } + } + + /** + * @param type parquet types + * @param name overrides parquet.getName() + * @return the mapping + */ + private TypeMapping fromParquetGroup(GroupType type, String name) { + OriginalType ot = type.getOriginalType(); + if (ot == null) { + List<TypeMapping> typeMappings = fromParquet(type.getFields()); + Field arrowField = new Field(name, type.isRepetition(OPTIONAL), new Struct_(), fields(typeMappings)); + return new StructTypeMapping(arrowField, type, typeMappings); + } else { + switch (ot) { + case LIST: + List3Levels list3Levels = new List3Levels(type); + TypeMapping child = fromParquet(list3Levels.getElement(), null, list3Levels.getElement().getRepetition()); + Field arrowField = new Field(name, type.isRepetition(OPTIONAL), new ArrowType.List(), asList(child.getArrowField())); + return new ListTypeMapping(arrowField, list3Levels, child); + default: + throw new UnsupportedOperationException("Unsupported type " + type); + } + } + } + + /** + * @param type parquet types + * @param name overrides parquet.getName() + * @return the mapping + */ + private TypeMapping fromParquetPrimitive(final PrimitiveType type, final String name) { + return type.getPrimitiveTypeName().convert(new PrimitiveType.PrimitiveTypeNameConverter<TypeMapping, RuntimeException>() { + + private TypeMapping field(ArrowType arrowType) { + Field field = new Field(name, type.isRepetition(OPTIONAL), arrowType, null); + return new PrimitiveTypeMapping(field, type); + } + + @Override + public TypeMapping convertFLOAT(PrimitiveTypeName primitiveTypeName) throws RuntimeException { + return field(new ArrowType.FloatingPoint(Precision.SINGLE)); + } + + @Override + public TypeMapping convertDOUBLE(PrimitiveTypeName primitiveTypeName) throws RuntimeException { + return field(new ArrowType.FloatingPoint(Precision.DOUBLE)); + } + + @Override + public TypeMapping convertINT32(PrimitiveTypeName primitiveTypeName) throws RuntimeException { + OriginalType ot = type.getOriginalType(); + if (ot == null) { + return integer(32, true); + } + switch (ot) { + case INT_8: + return integer(8, true); + case INT_16: + return integer(16, true); + case INT_32: + return integer(32, true); + case UINT_8: + return integer(8, false); + case UINT_16: + return integer(16, false); + case UINT_32: + return integer(32, false); + case DECIMAL: + return decimal(type.getDecimalMetadata()); + case DATE: + return field(new ArrowType.Date()); + case TIMESTAMP_MICROS: + return field(new ArrowType.Timestamp(TimeUnit.MICROSECOND)); + case TIMESTAMP_MILLIS: + return field(new ArrowType.Timestamp(TimeUnit.MILLISECOND)); + case TIME_MILLIS: + return field(new ArrowType.Time()); + default: + case TIME_MICROS: + case INT_64: + case UINT_64: + case UTF8: + case ENUM: + case BSON: + case INTERVAL: + case JSON: + case LIST: + case MAP: + case MAP_KEY_VALUE: + throw new IllegalArgumentException("illegal type " + type); + } + } + + @Override + public TypeMapping convertINT64(PrimitiveTypeName primitiveTypeName) throws RuntimeException { + OriginalType ot = type.getOriginalType(); + if (ot == null) { + return integer(64, true); + } + switch (ot) { + case INT_8: + return integer(8, true); + case INT_16: + return integer(16, true); + case INT_32: + return integer(32, true); + case INT_64: + return integer(64, true); + case UINT_8: + return integer(8, false); + case UINT_16: + return integer(16, false); + case UINT_32: + return integer(32, false); + case UINT_64: + return integer(64, false); + case DECIMAL: + return decimal(type.getDecimalMetadata()); + case DATE: + return field(new ArrowType.Date()); + case TIMESTAMP_MICROS: + return field(new ArrowType.Timestamp(TimeUnit.MICROSECOND)); + case TIMESTAMP_MILLIS: + return field(new ArrowType.Timestamp(TimeUnit.MILLISECOND)); + case TIME_MILLIS: + return field(new ArrowType.Time()); + default: + case TIME_MICROS: + case UTF8: + case ENUM: + case BSON: + case INTERVAL: + case JSON: + case LIST: + case MAP: + case MAP_KEY_VALUE: + throw new IllegalArgumentException("illegal type " + type); + } + } + + @Override + public TypeMapping convertINT96(PrimitiveTypeName primitiveTypeName) throws RuntimeException { + // Possibly timestamp + return field(new ArrowType.Binary()); + } + + @Override + public TypeMapping convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) throws RuntimeException { + return field(new ArrowType.Binary()); + } + + @Override + public TypeMapping convertBOOLEAN(PrimitiveTypeName primitiveTypeName) throws RuntimeException { + return field(new ArrowType.Bool()); + } + + @Override + public TypeMapping convertBINARY(PrimitiveTypeName primitiveTypeName) throws RuntimeException { + OriginalType ot = type.getOriginalType(); + if (ot == null) { + return field(new ArrowType.Binary()); + } + switch (ot) { + case UTF8: + return field(new ArrowType.Utf8()); + case DECIMAL: + return decimal(type.getDecimalMetadata()); + default: + throw new IllegalArgumentException("illegal type " + type); + } + } + + private TypeMapping decimal(DecimalMetadata decimalMetadata) { + return field(new ArrowType.Decimal(decimalMetadata.getPrecision(), decimalMetadata.getScale())); + } + + private TypeMapping integer(int width, boolean signed) { + return field(new ArrowType.Int(width, signed)); + } + }); + } + + /** + * Maps a Parquet and Arrow Schema + * For now does not validate primitive type compatibility + * @param arrowSchema + * @param parquetSchema + * @return the mapping between the 2 + */ + public SchemaMapping map(Schema arrowSchema, MessageType parquetSchema) { + List<TypeMapping> children = map(arrowSchema.getFields(), parquetSchema.getFields()); + return new SchemaMapping(arrowSchema, parquetSchema, children); + } + + private List<TypeMapping> map(List<Field> arrowFields, List<Type> parquetFields) { + if (arrowFields.size() != parquetFields.size()) { + throw new IllegalArgumentException("Can not map schemas as sizes differ: " + arrowFields + " != " + parquetFields); + } + List<TypeMapping> result = new ArrayList<>(arrowFields.size()); + for (int i = 0; i < arrowFields.size(); i++) { + Field arrowField = arrowFields.get(i); + Type parquetField = parquetFields.get(i); + result.add(map(arrowField, parquetField)); + } + return result; + } + + private TypeMapping map(final Field arrowField, final Type parquetField) { + return arrowField.getType().accept(new ArrowTypeVisitor<TypeMapping>() { + + @Override + public TypeMapping visit(Null type) { + if (!parquetField.isRepetition(OPTIONAL)) { + throw new IllegalArgumentException("Parquet type can't be null: " + parquetField); + } + return primitive(); + } + + @Override + public TypeMapping visit(Struct_ type) { + if (parquetField.isPrimitive()) { + throw new IllegalArgumentException("Parquet type not a group: " + parquetField); + } + GroupType groupType = parquetField.asGroupType(); + return new StructTypeMapping(arrowField, groupType, map(arrowField.getChildren(), groupType.getFields())); + } + + @Override + public TypeMapping visit(org.apache.arrow.vector.types.pojo.ArrowType.List type) { + if (arrowField.getChildren().size() != 1) { + throw new IllegalArgumentException("Invalid list type: " + type); + } + Field arrowChild = arrowField.getChildren().get(0); + if (parquetField.isRepetition(REPEATED)) { + return new RepeatedTypeMapping(arrowField, parquetField, map(arrowChild, parquetField)); + } + if (parquetField.isPrimitive()) { + throw new IllegalArgumentException("Parquet type not a group: " + parquetField); + } + List3Levels list3Levels = new List3Levels(parquetField.asGroupType()); + if (arrowField.getChildren().size() != 1) { + throw new IllegalArgumentException("invalid arrow list: " + arrowField); + } + return new ListTypeMapping(arrowField, list3Levels, map(arrowChild, list3Levels.getElement())); + } + + @Override + public TypeMapping visit(Union type) { + if (parquetField.isPrimitive()) { + throw new IllegalArgumentException("Parquet type not a group: " + parquetField); + } + GroupType groupType = parquetField.asGroupType(); + return new UnionTypeMapping(arrowField, groupType, map(arrowField.getChildren(), groupType.getFields())); + } + + @Override + public TypeMapping visit(Int type) { + return primitive(); + } + + @Override + public TypeMapping visit(FloatingPoint type) { + return primitive(); + } + + @Override + public TypeMapping visit(Utf8 type) { + return primitive(); + } + + @Override + public TypeMapping visit(Binary type) { + return primitive(); + } + + @Override + public TypeMapping visit(Bool type) { + return primitive(); + } + + @Override + public TypeMapping visit(Decimal type) { + return primitive(); + } + + @Override + public TypeMapping visit(Date type) { + return primitive(); + } + + @Override + public TypeMapping visit(Time type) { + return primitive(); + } + + @Override + public TypeMapping visit(Timestamp type) { + return primitive(); + } + + @Override + public TypeMapping visit(Interval type) { + return primitive(); + } + + private TypeMapping primitive() { + if (!parquetField.isPrimitive()) { + throw new IllegalArgumentException("Can not map schemas as one is primitive and the other is not: " + arrowField + " != " + parquetField); + } + return new PrimitiveTypeMapping(arrowField, parquetField.asPrimitiveType()); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/cf991604/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaMapping.java ---------------------------------------------------------------------- diff --git a/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaMapping.java b/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaMapping.java new file mode 100644 index 0000000..184d7c6 --- /dev/null +++ b/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaMapping.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.arrow.schema; + +import static java.util.Arrays.asList; + +import java.util.Collections; +import java.util.List; + +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +/** + * The mapping between an Arrow and a Parquet schema + * @see SchemaConverter + * + * @author Julien Le Dem + */ +public class SchemaMapping { + + private final Schema arrowSchema; + private final MessageType parquetSchema; + private final List<TypeMapping> children; + + SchemaMapping(Schema arrowSchema, MessageType parquetSchema, List<TypeMapping> children) { + super(); + this.arrowSchema = arrowSchema; + this.parquetSchema = parquetSchema; + this.children = Collections.unmodifiableList(children); + } + + public Schema getArrowSchema() { + return arrowSchema; + } + + public MessageType getParquetSchema() { + return parquetSchema; + } + + /** + * @return mapping between individual fields of each of the 2 schemas (should be the same width) + */ + public List<TypeMapping> getChildren() { + return children; + } + + /** + * To traverse a schema mapping + * @param <T> + */ + public interface TypeMappingVisitor<T> { + T visit(PrimitiveTypeMapping primitiveTypeMapping); + T visit(StructTypeMapping structTypeMapping); + T visit(UnionTypeMapping unionTypeMapping); + T visit(ListTypeMapping listTypeMapping); + T visit(RepeatedTypeMapping repeatedTypeMapping); + } + + /** + * Mapping between an Arrow and a Parquet types + */ + public abstract static class TypeMapping { + + private final Field arrowField; + private final Type parquetType; + private List<TypeMapping> children; + + TypeMapping(Field arrowField, Type parquetType, List<TypeMapping> children) { + super(); + this.arrowField = arrowField; + this.parquetType = parquetType; + this.children = children; + } + + public Field getArrowField() { + return arrowField; + } + + public Type getParquetType() { + return parquetType; + } + + public List<TypeMapping> getChildren() { + return children; + } + + public abstract <T> T accept(TypeMappingVisitor<T> visitor); + + } + + /** + * mapping between two primitive types + */ + public static class PrimitiveTypeMapping extends TypeMapping { + public PrimitiveTypeMapping(Field arrowField, PrimitiveType parquetType) { + super(arrowField, parquetType, Collections.<TypeMapping>emptyList()); + } + + @Override + public <T> T accept(TypeMappingVisitor<T> visitor) { + return visitor.visit(this); + } + } + + /** + * mapping of a struct type + */ + public static class StructTypeMapping extends TypeMapping { + public StructTypeMapping(Field arrowField, GroupType parquetType, List<TypeMapping> children) { + super(arrowField, parquetType, children); + } + + @Override + public <T> T accept(TypeMappingVisitor<T> visitor) { + return visitor.visit(this); + } + } + + /** + * mapping of a union type + */ + public static class UnionTypeMapping extends TypeMapping { + public UnionTypeMapping(Field arrowField, GroupType parquetType, List<TypeMapping> children) { + super(arrowField, parquetType, children); + } + + @Override + public <T> T accept(TypeMappingVisitor<T> visitor) { + return visitor.visit(this); + } + } + + /** + * mapping of a List type and standard 3-level List annotated Parquet type + */ + public static class ListTypeMapping extends TypeMapping { + private final List3Levels list3Levels; + private final TypeMapping child; + + public ListTypeMapping(Field arrowField, List3Levels list3Levels, TypeMapping child) { + super(arrowField, list3Levels.getList(), asList(child)); + this.list3Levels = list3Levels; + this.child = child; + if (list3Levels.getElement() != child.getParquetType()) { + throw new IllegalArgumentException(list3Levels + " <=> " + child); + } + } + + public List3Levels getList3Levels() { + return list3Levels; + } + + public TypeMapping getChild() { + return child; + } + + @Override + public <T> T accept(TypeMappingVisitor<T> visitor) { + return visitor.visit(this); + } + } + + /** + * mapping of a List type and repeated Parquet field (non-list annotated) + */ + public static class RepeatedTypeMapping extends TypeMapping { + private final TypeMapping child; + + public RepeatedTypeMapping(Field arrowField, Type parquetType, TypeMapping child) { + super(arrowField, parquetType, asList(child)); + this.child = child; + } + + public TypeMapping getChild() { + return child; + } + + @Override + public <T> T accept(TypeMappingVisitor<T> visitor) { + return visitor.visit(this); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/cf991604/parquet-arrow/src/test/java/org/apache/parquet/arrow/schema/TestSchemaConverter.java ---------------------------------------------------------------------- diff --git a/parquet-arrow/src/test/java/org/apache/parquet/arrow/schema/TestSchemaConverter.java b/parquet-arrow/src/test/java/org/apache/parquet/arrow/schema/TestSchemaConverter.java new file mode 100644 index 0000000..ec2b807 --- /dev/null +++ b/parquet-arrow/src/test/java/org/apache/parquet/arrow/schema/TestSchemaConverter.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.arrow.schema; + +import static java.util.Arrays.asList; +import static org.apache.parquet.schema.OriginalType.DATE; +import static org.apache.parquet.schema.OriginalType.DECIMAL; +import static org.apache.parquet.schema.OriginalType.INTERVAL; +import static org.apache.parquet.schema.OriginalType.INT_16; +import static org.apache.parquet.schema.OriginalType.INT_32; +import static org.apache.parquet.schema.OriginalType.INT_64; +import static org.apache.parquet.schema.OriginalType.INT_8; +import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS; +import static org.apache.parquet.schema.OriginalType.TIME_MILLIS; +import static org.apache.parquet.schema.OriginalType.UINT_16; +import static org.apache.parquet.schema.OriginalType.UINT_32; +import static org.apache.parquet.schema.OriginalType.UINT_64; +import static org.apache.parquet.schema.OriginalType.UINT_8; +import static org.apache.parquet.schema.OriginalType.UTF8; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; + +import java.io.IOException; +import java.util.List; + +import org.apache.arrow.flatbuf.IntervalUnit; +import org.apache.arrow.flatbuf.Precision; +import org.apache.arrow.flatbuf.TimeUnit; +import org.apache.arrow.flatbuf.UnionMode; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.parquet.arrow.schema.SchemaMapping.ListTypeMapping; +import org.apache.parquet.arrow.schema.SchemaMapping.PrimitiveTypeMapping; +import org.apache.parquet.arrow.schema.SchemaMapping.RepeatedTypeMapping; +import org.apache.parquet.arrow.schema.SchemaMapping.StructTypeMapping; +import org.apache.parquet.arrow.schema.SchemaMapping.TypeMapping; +import org.apache.parquet.arrow.schema.SchemaMapping.TypeMappingVisitor; +import org.apache.parquet.arrow.schema.SchemaMapping.UnionTypeMapping; +import org.apache.parquet.example.Paper; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.junit.Test; + +import junit.framework.Assert; + +/** + * @see SchemaConverter + */ +public class TestSchemaConverter { + + private static Field field(String name, boolean nullable, ArrowType type, Field... children) { + return new Field(name, nullable, type, asList(children)); + } + + private static Field field(String name, ArrowType type, Field... children) { + return field(name, true, type, children); + } + + private final Schema complexArrowSchema = new Schema(asList( + field("a", false, new ArrowType.Int(8, true)), + field("b", new ArrowType.Struct_(), + field("c", new ArrowType.Int(16, true)), + field("d", new ArrowType.Utf8())), + field("e", new ArrowType.List(), field(null, new ArrowType.Date())), + field("f", new ArrowType.FloatingPoint(Precision.SINGLE)), + field("g", new ArrowType.Timestamp(TimeUnit.MILLISECOND)), + field("h", new ArrowType.Interval(IntervalUnit.DAY_TIME)) + )); + private final MessageType complexParquetSchema = Types.buildMessage() + .addField(Types.optional(INT32).as(INT_8).named("a")) + .addField(Types.optionalGroup() + .addField(Types.optional(INT32).as(INT_16).named("c")) + .addField(Types.optional(BINARY).as(UTF8).named("d")) + .named("b")) + .addField(Types.optionalList(). + setElementType(Types.optional(INT32).as(DATE).named("element")) + .named("e")) + .addField(Types.optional(FLOAT).named("f")) + .addField(Types.optional(INT64).as(TIMESTAMP_MILLIS).named("g")) + .addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("h")) + .named("root"); + + private final Schema allTypesArrowSchema = new Schema(asList( + field("a", false, new ArrowType.Null()), + field("b", new ArrowType.Struct_(), field("ba", new ArrowType.Null())), + field("c", new ArrowType.List(), field("ca", new ArrowType.Null())), + field("d", new ArrowType.Union(UnionMode.Sparse, new int[] {1, 2, 3}), field("da", new ArrowType.Null())), + field("e", new ArrowType.Int(8, true)), + field("e1", new ArrowType.Int(16, true)), + field("e2", new ArrowType.Int(32, true)), + field("e3", new ArrowType.Int(64, true)), + field("e4", new ArrowType.Int(8, false)), + field("e5", new ArrowType.Int(16, false)), + field("e6", new ArrowType.Int(32, false)), + field("e7", new ArrowType.Int(64, false)), + field("f", new ArrowType.FloatingPoint(Precision.SINGLE)), + field("f1", new ArrowType.FloatingPoint(Precision.DOUBLE)), + field("g", new ArrowType.Utf8()), + field("h", new ArrowType.Binary()), + field("i", new ArrowType.Bool()), + field("j", new ArrowType.Decimal(5, 5)), + field("j1", new ArrowType.Decimal(15, 5)), + field("j2", new ArrowType.Decimal(25, 5)), + field("k", new ArrowType.Date()), + field("l", new ArrowType.Time()), + field("m", new ArrowType.Timestamp(TimeUnit.MILLISECOND)), + field("n", new ArrowType.Interval(IntervalUnit.DAY_TIME)), + field("n1", new ArrowType.Interval(IntervalUnit.YEAR_MONTH)) + )); + private final MessageType allTypesParquetSchema = Types.buildMessage() + .addField(Types.optional(BINARY).named("a")) + .addField(Types.optionalGroup() + .addField(Types.optional(BINARY).named("ba")) + .named("b")) + .addField(Types.optionalList(). + setElementType(Types.optional(BINARY).named("element")) + .named("c")) + .addField(Types.optionalGroup() + .addField(Types.optional(BINARY).named("da")) + .named("d")) + .addField(Types.optional(INT32).as(INT_8).named("e")) + .addField(Types.optional(INT32).as(INT_16).named("e1")) + .addField(Types.optional(INT32).as(INT_32).named("e2")) + .addField(Types.optional(INT64).as(INT_64).named("e3")) + .addField(Types.optional(INT32).as(UINT_8).named("e4")) + .addField(Types.optional(INT32).as(UINT_16).named("e5")) + .addField(Types.optional(INT32).as(UINT_32).named("e6")) + .addField(Types.optional(INT64).as(UINT_64).named("e7")) + .addField(Types.optional(FLOAT).named("f")) + .addField(Types.optional(DOUBLE).named("f1")) + .addField(Types.optional(BINARY).as(UTF8).named("g")) + .addField(Types.optional(BINARY).named("h")) + .addField(Types.optional(BOOLEAN).named("i")) + .addField(Types.optional(INT32).as(DECIMAL).precision(5).scale(5).named("j")) + .addField(Types.optional(INT64).as(DECIMAL).precision(15).scale(5).named("j1")) + .addField(Types.optional(BINARY).as(DECIMAL).precision(25).scale(5).named("j2")) + .addField(Types.optional(INT32).as(DATE).named("k")) + .addField(Types.optional(INT32).as(TIME_MILLIS).named("l")) + .addField(Types.optional(INT64).as(TIMESTAMP_MILLIS).named("m")) + .addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("n")) + .addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("n1")) + .named("root"); + + private final Schema supportedTypesArrowSchema = new Schema(asList( + field("b", new ArrowType.Struct_(), field("ba", new ArrowType.Binary())), + field("c", new ArrowType.List(), field(null, new ArrowType.Binary())), + field("e", new ArrowType.Int(8, true)), + field("e1", new ArrowType.Int(16, true)), + field("e2", new ArrowType.Int(32, true)), + field("e3", new ArrowType.Int(64, true)), + field("e4", new ArrowType.Int(8, false)), + field("e5", new ArrowType.Int(16, false)), + field("e6", new ArrowType.Int(32, false)), + field("e7", new ArrowType.Int(64, false)), + field("f", new ArrowType.FloatingPoint(Precision.SINGLE)), + field("f1", new ArrowType.FloatingPoint(Precision.DOUBLE)), + field("g", new ArrowType.Utf8()), + field("h", new ArrowType.Binary()), + field("i", new ArrowType.Bool()), + field("j", new ArrowType.Decimal(5, 5)), + field("j1", new ArrowType.Decimal(15, 5)), + field("j2", new ArrowType.Decimal(25, 5)), + field("k", new ArrowType.Date()), + field("l", new ArrowType.Time()), + field("m", new ArrowType.Timestamp(TimeUnit.MILLISECOND)) + )); + + private final MessageType supportedTypesParquetSchema = Types.buildMessage() + .addField(Types.optionalGroup() + .addField(Types.optional(BINARY).named("ba")) + .named("b")) + .addField(Types.optionalList(). + setElementType(Types.optional(BINARY).named("element")) + .named("c")) + .addField(Types.optional(INT32).as(INT_8).named("e")) + .addField(Types.optional(INT32).as(INT_16).named("e1")) + .addField(Types.optional(INT32).as(INT_32).named("e2")) + .addField(Types.optional(INT64).as(INT_64).named("e3")) + .addField(Types.optional(INT32).as(UINT_8).named("e4")) + .addField(Types.optional(INT32).as(UINT_16).named("e5")) + .addField(Types.optional(INT32).as(UINT_32).named("e6")) + .addField(Types.optional(INT64).as(UINT_64).named("e7")) + .addField(Types.optional(FLOAT).named("f")) + .addField(Types.optional(DOUBLE).named("f1")) + .addField(Types.optional(BINARY).as(UTF8).named("g")) + .addField(Types.optional(BINARY).named("h")) + .addField(Types.optional(BOOLEAN).named("i")) + .addField(Types.optional(INT32).as(DECIMAL).precision(5).scale(5).named("j")) + .addField(Types.optional(INT64).as(DECIMAL).precision(15).scale(5).named("j1")) + .addField(Types.optional(BINARY).as(DECIMAL).precision(25).scale(5).named("j2")) + .addField(Types.optional(INT32).as(DATE).named("k")) + .addField(Types.optional(INT32).as(TIME_MILLIS).named("l")) + .addField(Types.optional(INT64).as(TIMESTAMP_MILLIS).named("m")) + .named("root"); + + private final Schema paperArrowSchema = new Schema(asList( + field("DocId", false, new ArrowType.Int(64, true)), + field("Links", new ArrowType.Struct_(), + field("Backward", false, new ArrowType.List(), field(null, false, new ArrowType.Int(64, true))), + field("Forward", false, new ArrowType.List(), field(null, false, new ArrowType.Int(64, true))) + ), + field("Name", false, new ArrowType.List(), + field(null, false, new ArrowType.Struct_(), + field("Language", false, new ArrowType.List(), + field(null, false, new ArrowType.Struct_(), + field("Code", false, new ArrowType.Binary()), + field("Country", new ArrowType.Binary()) + ) + ), + field("Url", new ArrowType.Binary()) + ) + ) + )); + + private SchemaConverter converter = new SchemaConverter(); + + @Test + public void testComplexArrowToParquet() throws IOException { + MessageType parquet = converter.fromArrow(complexArrowSchema).getParquetSchema(); + Assert.assertEquals(complexParquetSchema.toString(), parquet.toString()); // easier to read + Assert.assertEquals(complexParquetSchema, parquet); + } + + @Test + public void testAllArrowToParquet() throws IOException { + MessageType parquet = converter.fromArrow(allTypesArrowSchema).getParquetSchema(); + Assert.assertEquals(allTypesParquetSchema.toString(), parquet.toString()); // easier to read + Assert.assertEquals(allTypesParquetSchema, parquet); + } + + @Test + public void testSupportedParquetToArrow() throws IOException { + Schema arrow = converter.fromParquet(supportedTypesParquetSchema).getArrowSchema(); + assertEquals(supportedTypesArrowSchema, arrow); + } + + @Test + public void testRepeatedParquetToArrow() throws IOException { + Schema arrow = converter.fromParquet(Paper.schema).getArrowSchema(); + assertEquals(paperArrowSchema, arrow); + } + + public void assertEquals(Schema left, Schema right) { + compareFields(left.getFields(), right.getFields()); + Assert.assertEquals(left, right); + } + + /** + * for more pinpointed error on what is different + * @param left + * @param right + */ + private void compareFields(List<Field> left, List<Field> right) { + Assert.assertEquals(left + "\n" + right, left.size(), right.size()); + int size = left.size(); + for (int i = 0; i < size; i++) { + Field expectedField = left.get(i); + Field field = right.get(i); + compareFields(expectedField.getChildren(), field.getChildren()); + Assert.assertEquals(expectedField, field); + } + } + + @Test + public void testAllMap() throws IOException { + SchemaMapping map = converter.map(allTypesArrowSchema, allTypesParquetSchema); + Assert.assertEquals("p, s<p>, l<p>, u<p>, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p", toSummaryString(map)); + } + + private String toSummaryString(SchemaMapping map) { + List<TypeMapping> fields = map.getChildren(); + return toSummaryString(fields); + } + + private String toSummaryString(List<TypeMapping> fields) { + final StringBuilder sb = new StringBuilder(); + for (TypeMapping typeMapping : fields) { + if (sb.length() != 0) { + sb.append(", "); + } + sb.append( + typeMapping.accept(new TypeMappingVisitor<String>() { + @Override + public String visit(PrimitiveTypeMapping primitiveTypeMapping) { + return "p"; + } + + @Override + public String visit(StructTypeMapping structTypeMapping) { + return "s"; + } + + @Override + public String visit(UnionTypeMapping unionTypeMapping) { + return "u"; + } + + @Override + public String visit(ListTypeMapping listTypeMapping) { + return "l"; + } + + @Override + public String visit(RepeatedTypeMapping repeatedTypeMapping) { + return "r"; + } + }) + ); + if (typeMapping.getChildren() != null && !typeMapping.getChildren().isEmpty()) { + sb.append("<").append(toSummaryString(typeMapping.getChildren())).append(">"); + } + } + return sb.toString(); + } + + @Test + public void testRepeatedMap() throws IOException { + SchemaMapping map = converter.map(paperArrowSchema, Paper.schema); + Assert.assertEquals("p, s<r<p>, r<p>>, r<s<r<s<p, p>>, p>>", toSummaryString(map)); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/cf991604/parquet-column/src/main/java/org/apache/parquet/schema/Types.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java index 9af71af..5526cfc 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -1071,10 +1071,11 @@ public class Types { super(returnType); } - public void setElementType(Type elementType) { + public THIS setElementType(Type elementType) { Preconditions.checkState(this.elementType == null, "Only one element can be built with a ListBuilder"); this.elementType = elementType; + return self(); } public static class ElementBuilder<LP, L extends BaseListBuilder<LP, L>> http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/cf991604/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d50c132..5f57e13 100644 --- a/pom.xml +++ b/pom.xml @@ -92,6 +92,7 @@ </properties> <modules> + <module>parquet-arrow</module> <module>parquet-avro</module> <module>parquet-benchmarks</module> <module>parquet-cascading</module>
