Repository: hive Updated Branches: refs/heads/master 646ccce8e -> 7acc4ce1b
HIVE-18211: Support to read multiple level definition for Map type in Parquet file (Colin Ma, reviewed by Ferdinand Xu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7acc4ce1 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7acc4ce1 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7acc4ce1 Branch: refs/heads/master Commit: 7acc4ce1bbae060d890494c1499938c1eda5f3b6 Parents: 646ccce Author: Ferdinand Xu <cheng.a...@intel.com> Authored: Mon Dec 18 09:35:16 2017 +0800 Committer: Ferdinand Xu <cheng.a...@intel.com> Committed: Mon Dec 18 09:35:16 2017 +0800 ---------------------------------------------------------------------- .../vector/VectorizedParquetRecordReader.java | 27 +++++++++++++++++++- .../parquet/TestVectorizedMapColumnReader.java | 26 ++++++++++++++++++- 2 files changed, 51 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/7acc4ce1/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index 4303ca9..bffe008 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -61,6 +61,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopStreams; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.SeekableInputStream; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; @@ -97,6 +98,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase private VectorizedRowBatchCtx rbCtx; private Object[] partitionValues; private Path cacheFsPath; + private static final int MAP_DEFINITION_LEVEL_MAX = 3; /** * For each request column, the reader to read this column. This is NULL if this column @@ -507,7 +509,30 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase throw new RuntimeException( "Failed to find related Parquet column descriptor with type " + type); } - List<Type> kvTypes = type.asGroupType().getFields(); + + // to handle the different Map definition in Parquet, eg: + // definition has 1 group: + // repeated group map (MAP_KEY_VALUE) + // {required binary key (UTF8); optional binary value (UTF8);} + // definition has 2 groups: + // optional group m1 (MAP) { + // repeated group map (MAP_KEY_VALUE) + // {required binary key (UTF8); optional binary value (UTF8);} + // } + int nestGroup = 0; + GroupType groupType = type.asGroupType(); + // if FieldCount == 2, get types for key & value, + // otherwise, continue to get the group type until MAP_DEFINITION_LEVEL_MAX. + while (groupType.getFieldCount() < 2) { + if (nestGroup > MAP_DEFINITION_LEVEL_MAX) { + throw new RuntimeException( + "More than " + MAP_DEFINITION_LEVEL_MAX + " level is found in Map definition, " + + "Failed to get the field types for Map with type " + type); + } + groupType = groupType.getFields().get(0).asGroupType(); + nestGroup++; + } + List<Type> kvTypes = groupType.getFields(); VectorizedListColumnReader keyListColumnReader = new VectorizedListColumnReader( descriptors.get(0), pages.getPageReader(descriptors.get(0)), skipTimestampConversion, kvTypes.get(0)); http://git-wip-us.apache.org/repos/asf/hive/blob/7acc4ce1/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java index c33e8ab..185dfbb 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java @@ -56,6 +56,8 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas int mapSize = i % mapMaxSize + 1; if (!isNull) { + // the map_field is to test multiple level map definition + Group multipleLevelGroup = group.addGroup("map_field"); for (int j = 0; j < mapSize; j++) { int intValForMap = getIntValue(isDictionaryEncoding, mapElementIndex); long longValForMap = getLongValue(isDictionaryEncoding, mapElementIndex); @@ -74,6 +76,8 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas .append("value", binaryValForMap); group.addGroup("map_decimal").append("key", decimalValForMap) .append("value", decimalValForMap); + multipleLevelGroup.addGroup("map").append("key", binaryValForMap) + .append("value", binaryValForMap); mapElementIndex++; } } @@ -160,6 +164,14 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas removeFile(); } + @Test + public void testMultipleDefinitionMapRead() throws Exception { + removeFile(); + writeMapData(initWriterFromFile(), false, 1023); + testMapRead(false, "multipleLevel", 1023); + removeFile(); + } + private void testMapReadAllType(boolean isDictionaryEncoding, int elementNum) throws Exception { testMapRead(isDictionaryEncoding, "int", elementNum); testMapRead(isDictionaryEncoding, "long", elementNum); @@ -267,6 +279,9 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas } else if ("decimal".equals(type)) { conf.set(IOConstants.COLUMNS, "map_decimal"); conf.set(IOConstants.COLUMNS_TYPES, "map<decimal(5,2),decimal(5,2)>"); + } else if ("multipleLevel".equals(type)) { + conf.set(IOConstants.COLUMNS, "map_field"); + conf.set(IOConstants.COLUMNS_TYPES, "map<string,string>"); } } @@ -291,6 +306,15 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas case "decimal": return String.format(schemaFormat, "decimal", "binary", "(DECIMAL(5,2))", "binary", "(DECIMAL(5,2))"); + case "multipleLevel": + return "message hive_schema {\n" + + "optional group map_field (MAP) {\n" + + " repeated group map (MAP_KEY_VALUE) {\n" + + " required binary key;\n" + + " optional binary value;\n" + + " }\n" + + "}\n" + + "}\n"; default: throw new RuntimeException("Unsupported type for TestVectorizedMapColumnReader!"); } @@ -310,7 +334,7 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas } else if ("float".equals(type)) { assertEquals(getFloatValue(isDictionaryEncoding, valueIndex), ((DoubleColumnVector)childVector).vector[position], 0); - } else if ("binary".equals(type)) { + } else if ("binary".equals(type) || "multipleLevel".equals(type)) { String actual = new String(ArrayUtils .subarray(((BytesColumnVector)childVector).vector[position], ((BytesColumnVector)childVector).start[position],