Repository: hive Updated Branches: refs/heads/master b8aa16ff6 -> 2efb7d38a
HIVE-17972: Implement Parquet vectorization reader for Map type (Colin Ma, reviewed by Ferdinand Xu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2efb7d38 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2efb7d38 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2efb7d38 Branch: refs/heads/master Commit: 2efb7d38af639ee77f817f2ebf772314309174be Parents: b8aa16f Author: Ferdinand Xu <cheng.a...@intel.com> Authored: Thu Nov 30 11:24:31 2017 +0800 Committer: Ferdinand Xu <cheng.a...@intel.com> Committed: Thu Nov 30 11:24:31 2017 +0800 ---------------------------------------------------------------------- .../vector/VectorizedMapColumnReader.java | 69 ++++ .../vector/VectorizedParquetRecordReader.java | 12 + .../parquet/TestVectorizedMapColumnReader.java | 327 +++++++++++++++++++ .../parquet/VectorizedColumnReaderTestBase.java | 28 ++ 4 files changed, 436 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2efb7d38/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedMapColumnReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedMapColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedMapColumnReader.java new file mode 100644 index 0000000..6099a28 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedMapColumnReader.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.io.IOException; + +/** + * It's column level Parquet reader which is used to read a batch of records for a map column. + */ +public class VectorizedMapColumnReader implements VectorizedColumnReader { + private VectorizedListColumnReader keyColumnReader; + private VectorizedListColumnReader valueColumnReader; + + public VectorizedMapColumnReader(VectorizedListColumnReader keyColumnReader, + VectorizedListColumnReader valueColumnReader) { + this.keyColumnReader = keyColumnReader; + this.valueColumnReader = valueColumnReader; + } + + @Override + public void readBatch(int total, ColumnVector column, TypeInfo columnType) throws IOException { + MapColumnVector mapColumnVector = (MapColumnVector) column; + MapTypeInfo mapTypeInfo = (MapTypeInfo) columnType; + ListTypeInfo keyListTypeInfo = new ListTypeInfo(); + keyListTypeInfo.setListElementTypeInfo(mapTypeInfo.getMapKeyTypeInfo()); + ListTypeInfo valueListTypeInfo = new ListTypeInfo(); + valueListTypeInfo.setListElementTypeInfo(mapTypeInfo.getMapValueTypeInfo()); + + // initialize 2 ListColumnVector for keys and values + ListColumnVector keyListColumnVector = new ListColumnVector(); + ListColumnVector valueListColumnVector = new ListColumnVector(); + // read the keys and values + keyColumnReader.readBatch(total, keyListColumnVector, keyListTypeInfo); + valueColumnReader.readBatch(total, valueListColumnVector, valueListTypeInfo); + + // set the related attributes according to the keys and values + mapColumnVector.keys = keyListColumnVector.child; + mapColumnVector.values = valueListColumnVector.child; + mapColumnVector.isNull = keyListColumnVector.isNull; + mapColumnVector.offsets = keyListColumnVector.offsets; + mapColumnVector.lengths = keyListColumnVector.lengths; + mapColumnVector.childCount = keyListColumnVector.childCount; + mapColumnVector.isRepeating = keyListColumnVector.isRepeating + && valueListColumnVector.isRepeating; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2efb7d38/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index 941bd7d..4303ca9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -503,6 +503,18 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase return new VectorizedListColumnReader(descriptors.get(0), pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type); case MAP: + if (columnDescriptors == null || columnDescriptors.isEmpty()) { + throw new RuntimeException( + "Failed to find related Parquet column descriptor with type " + type); + } + List<Type> kvTypes = type.asGroupType().getFields(); + VectorizedListColumnReader keyListColumnReader = new VectorizedListColumnReader( + descriptors.get(0), pages.getPageReader(descriptors.get(0)), skipTimestampConversion, + kvTypes.get(0)); + VectorizedListColumnReader valueListColumnReader = new VectorizedListColumnReader( + descriptors.get(1), pages.getPageReader(descriptors.get(1)), skipTimestampConversion, + kvTypes.get(1)); + return new VectorizedMapColumnReader(keyListColumnReader, valueListColumnReader); case UNION: default: throw new RuntimeException("Unsupported category " + typeInfo.getCategory().name()); http://git-wip-us.apache.org/repos/asf/hive/blob/2efb7d38/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java new file mode 100644 index 0000000..c33e8ab --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java @@ -0,0 +1,327 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.io.api.Binary; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBase { + + protected static void writeMapData(ParquetWriter<Group> writer, boolean isDictionaryEncoding, + int elementNum) throws IOException { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + int mapMaxSize = 4; + int mapElementIndex = 0; + for (int i = 0; i < elementNum; i++) { + boolean isNull = isNull(i); + Group group = f.newGroup(); + + int mapSize = i % mapMaxSize + 1; + if (!isNull) { + for (int j = 0; j < mapSize; j++) { + int intValForMap = getIntValue(isDictionaryEncoding, mapElementIndex); + long longValForMap = getLongValue(isDictionaryEncoding, mapElementIndex); + double doubleValForMap = getDoubleValue(isDictionaryEncoding, mapElementIndex); + float floatValForMap = getFloatValue(isDictionaryEncoding, mapElementIndex); + Binary binaryValForMap = getBinaryValue(isDictionaryEncoding, mapElementIndex); + HiveDecimal hd = getDecimal(isDictionaryEncoding, mapElementIndex).setScale(2); + HiveDecimalWritable hdw = new HiveDecimalWritable(hd); + Binary decimalValForMap = Binary.fromConstantByteArray(hdw.getInternalStorage()); + group.addGroup("map_int32").append("key", intValForMap).append("value", intValForMap); + group.addGroup("map_int64").append("key", longValForMap).append("value", longValForMap); + group.addGroup("map_double").append("key", doubleValForMap) + .append("value", doubleValForMap); + group.addGroup("map_float").append("key", floatValForMap).append("value", floatValForMap); + group.addGroup("map_binary").append("key", binaryValForMap) + .append("value", binaryValForMap); + group.addGroup("map_decimal").append("key", decimalValForMap) + .append("value", decimalValForMap); + mapElementIndex++; + } + } + writer.write(group); + } + writer.close(); + } + + protected static void writeRepeateMapData( + ParquetWriter<Group> writer, int elementNum, boolean isNull) throws IOException { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + int mapMaxSize = 4; + for (int i = 0; i < elementNum; i++) { + Group group = f.newGroup(); + if (!isNull) { + for (int j = 0; j < mapMaxSize; j++) { + group.addGroup("map_int32_for_repeat_test").append("key", j).append("value", j); + } + } + writer.write(group); + } + writer.close(); + } + + @Test + public void testMapReadLessOneBatch() throws Exception { + boolean isDictionaryEncoding = false; + removeFile(); + writeMapData(initWriterFromFile(), isDictionaryEncoding, 1023); + testMapReadAllType(isDictionaryEncoding, 1023); + removeFile(); + isDictionaryEncoding = true; + writeMapData(initWriterFromFile(), isDictionaryEncoding, 1023); + testMapReadAllType(isDictionaryEncoding, 1023); + removeFile(); + } + + @Test + public void testMapReadEqualOneBatch() throws Exception { + boolean isDictionaryEncoding = false; + removeFile(); + writeMapData(initWriterFromFile(), isDictionaryEncoding, 1024); + testMapReadAllType(isDictionaryEncoding, 1024); + removeFile(); + isDictionaryEncoding = true; + writeMapData(initWriterFromFile(), isDictionaryEncoding, 1024); + testMapReadAllType(isDictionaryEncoding, 1024); + removeFile(); + } + + @Test + public void testMapReadMoreOneBatch() throws Exception { + boolean isDictionaryEncoding = false; + removeFile(); + writeMapData(initWriterFromFile(), isDictionaryEncoding, 1025); + testMapReadAllType(isDictionaryEncoding, 1025); + removeFile(); + isDictionaryEncoding = true; + writeMapData(initWriterFromFile(), isDictionaryEncoding, 1025); + testMapReadAllType(isDictionaryEncoding, 1025); + removeFile(); + } + + @Test + public void testRepeateMapRead() throws Exception { + removeFile(); + writeRepeateMapData(initWriterFromFile(), 1023, false); + testRepeateMapRead(1023, false); + removeFile(); + writeRepeateMapData(initWriterFromFile(), 1023, true); + testRepeateMapRead(1023, true); + removeFile(); + writeRepeateMapData(initWriterFromFile(), 1024, false); + testRepeateMapRead(1024, false); + removeFile(); + writeRepeateMapData(initWriterFromFile(), 1024, true); + testRepeateMapRead(1024, true); + removeFile(); + writeRepeateMapData(initWriterFromFile(), 1025, false); + testRepeateMapRead(1025, false); + removeFile(); + writeRepeateMapData(initWriterFromFile(), 1025, true); + testRepeateMapRead(1025, true); + removeFile(); + } + + private void testMapReadAllType(boolean isDictionaryEncoding, int elementNum) throws Exception { + testMapRead(isDictionaryEncoding, "int", elementNum); + testMapRead(isDictionaryEncoding, "long", elementNum); + testMapRead(isDictionaryEncoding, "double", elementNum); + testMapRead(isDictionaryEncoding, "float", elementNum); + testMapRead(isDictionaryEncoding, "binary", elementNum); + testMapRead(isDictionaryEncoding, "decimal", elementNum); + } + + private void testMapRead(boolean isDictionaryEncoding, String type, + int elementNum) throws Exception { + Configuration conf = new Configuration(); + setTypeConfiguration(type, conf); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = createTestParquetReader(getSchema(type), conf); + VectorizedRowBatch previous = reader.createValue(); + int row = 0; + int index = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + MapColumnVector mapVector = (MapColumnVector) previous.cols[0]; + + //since Repeating only happens when offset length is 1. + assertEquals((mapVector.offsets.length == 1),mapVector.isRepeating); + + for (int i = 0; i < mapVector.offsets.length; i++) { + if (row == elementNum) { + assertEquals(i, mapVector.offsets.length - 1); + break; + } + long start = mapVector.offsets[i]; + long length = mapVector.lengths[i]; + boolean isNull = isNull(row); + if (isNull) { + assertEquals(mapVector.isNull[i], true); + } else { + for (long j = 0; j < length; j++) { + assertValue(type, mapVector.keys, isDictionaryEncoding, index, (int) (start + j)); + assertValue(type, mapVector.values, isDictionaryEncoding, index, (int) (start + j)); + index++; + } + } + row++; + } + } + assertEquals("It doesn't exit at expected position", elementNum, row); + } finally { + reader.close(); + } + } + + private void testRepeateMapRead(int elementNum, boolean isNull) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "map_int32_for_repeat_test"); + conf.set(IOConstants.COLUMNS_TYPES, "map<int,int>"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + String schema = "message hive_schema {\n" + + " repeated group map_int32_for_repeat_test (MAP_KEY_VALUE) {\n" + + " required int32 key;\n" + + " optional int32 value;\n" + + " }\n" + + "}\n"; + VectorizedParquetRecordReader reader = createTestParquetReader(schema, conf); + VectorizedRowBatch previous = reader.createValue(); + int row = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + MapColumnVector mapVector = (MapColumnVector) previous.cols[0]; + + assertTrue(mapVector.isRepeating); + assertEquals(isNull, mapVector.isNull[0]); + + for (int i = 0; i < mapVector.offsets.length; i++) { + if (row == elementNum) { + assertEquals(i, mapVector.offsets.length - 1); + break; + } + row++; + } + } + assertEquals("It doesn't exit at expected position", elementNum, row); + } finally { + reader.close(); + } + } + + private void setTypeConfiguration(String type, Configuration conf) { + if ("int".equals(type)) { + conf.set(IOConstants.COLUMNS, "map_int32"); + conf.set(IOConstants.COLUMNS_TYPES, "map<int,int>"); + } else if ("long".equals(type)) { + conf.set(IOConstants.COLUMNS, "map_int64"); + conf.set(IOConstants.COLUMNS_TYPES, "map<bigint,bigint>"); + } else if ("double".equals(type)) { + conf.set(IOConstants.COLUMNS, "map_double"); + conf.set(IOConstants.COLUMNS_TYPES, "map<double,double>"); + } else if ("float".equals(type)) { + conf.set(IOConstants.COLUMNS, "map_float"); + conf.set(IOConstants.COLUMNS_TYPES, "map<float,float>"); + } else if ("binary".equals(type)) { + conf.set(IOConstants.COLUMNS, "map_binary"); + conf.set(IOConstants.COLUMNS_TYPES, "map<string,string>"); + } else if ("decimal".equals(type)) { + conf.set(IOConstants.COLUMNS, "map_decimal"); + conf.set(IOConstants.COLUMNS_TYPES, "map<decimal(5,2),decimal(5,2)>"); + } + } + + private String getSchema(String type) { + String schemaFormat = "message hive_schema {\n" + + " repeated group map_%s (MAP_KEY_VALUE) {\n" + + " required %s key %s;\n" + + " optional %s value %s;\n" + + " }\n" + + "}\n"; + switch (type){ + case "int": + return String.format(schemaFormat, "int32", "int32", "", "int32", ""); + case "long": + return String.format(schemaFormat, "int64", "int64", "", "int64", ""); + case "double": + return String.format(schemaFormat, "double", "double", "", "double", ""); + case "float": + return String.format(schemaFormat, "float", "float", "", "float", ""); + case "binary": + return String.format(schemaFormat, "binary", "binary", "", "binary", ""); + case "decimal": + return String.format(schemaFormat, "decimal", "binary", "(DECIMAL(5,2))", + "binary", "(DECIMAL(5,2))"); + default: + throw new RuntimeException("Unsupported type for TestVectorizedMapColumnReader!"); + } + } + + private void assertValue(String type, ColumnVector childVector, boolean isDictionaryEncoding, + int valueIndex, int position) { + if ("int".equals(type)) { + assertEquals(getIntValue(isDictionaryEncoding, valueIndex), + ((LongColumnVector)childVector).vector[position]); + } else if ("long".equals(type)) { + assertEquals(getLongValue(isDictionaryEncoding, valueIndex), + ((LongColumnVector)childVector).vector[position]); + } else if ("double".equals(type)) { + assertEquals(getDoubleValue(isDictionaryEncoding, valueIndex), + ((DoubleColumnVector)childVector).vector[position], 0); + } else if ("float".equals(type)) { + assertEquals(getFloatValue(isDictionaryEncoding, valueIndex), + ((DoubleColumnVector)childVector).vector[position], 0); + } else if ("binary".equals(type)) { + String actual = new String(ArrayUtils + .subarray(((BytesColumnVector)childVector).vector[position], + ((BytesColumnVector)childVector).start[position], + ((BytesColumnVector)childVector).start[position] + + ((BytesColumnVector)childVector).length[position])); + assertEquals(getStr(isDictionaryEncoding, valueIndex), actual); + } else if ("decimal".equals(type)) { + assertEquals(getDecimal(isDictionaryEncoding, valueIndex), + ((DecimalColumnVector)childVector).vector[position].getHiveDecimal()); + } else { + throw new RuntimeException("Unsupported type for TestVectorizedMapColumnReader!"); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2efb7d38/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java index 929e991..33c5c82 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java @@ -125,6 +125,34 @@ public class VectorizedColumnReaderTestBase { + "repeated binary list_binary_field;" + "repeated binary list_decimal_field (DECIMAL(5,2));" + "repeated int32 list_int32_field_for_repeat_test;" + + "repeated group map_int32 (MAP_KEY_VALUE) {\n" + + " required int32 key;\n" + + " optional int32 value;\n" + + "}\n" + + "repeated group map_int64 (MAP_KEY_VALUE) {\n" + + " required int64 key;\n" + + " optional int64 value;\n" + + "}\n" + + "repeated group map_double (MAP_KEY_VALUE) {\n" + + " required double key;\n" + + " optional double value;\n" + + "}\n" + + "repeated group map_float (MAP_KEY_VALUE) {\n" + + " required float key;\n" + + " optional float value;\n" + + "}\n" + + "repeated group map_binary (MAP_KEY_VALUE) {\n" + + " required binary key;\n" + + " optional binary value;\n" + + "}\n" + + "repeated group map_decimal (MAP_KEY_VALUE) {\n" + + " required binary key (DECIMAL(5,2));\n" + + " optional binary value (DECIMAL(5,2));\n" + + "}\n" + + "repeated group map_int32_for_repeat_test (MAP_KEY_VALUE) {\n" + + " required int32 key;\n" + + " optional int32 value;\n" + + "}\n" + "} "); protected static void removeFile() throws IOException {