This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit ffab527451e0a23eca96f38bce52c790553cc47e Author: Igor Guzenko <ihor.huzenko....@gmail.com> AuthorDate: Mon Aug 19 20:02:51 2019 +0300 DRILL-7326: Support repeated lists for CTAS parquet format closes #1844 --- .../codegen/templates/EventBasedRecordWriter.java | 13 ++ .../templates/ParquetOutputRecordWriter.java | 107 +++++++++++++++- .../exec/store/parquet/ParquetRecordWriter.java | 136 ++++++++++++++++++++- .../physical/impl/writer/TestParquetWriter.java | 108 ++++++++++++---- .../resources/jsoninput/repeated_list_of_maps.json | 2 + 5 files changed, 333 insertions(+), 33 deletions(-) diff --git a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java index 7357243..d87eeb3 100644 --- a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java @@ -118,6 +118,19 @@ public class EventBasedRecordWriter { } public abstract void writeField() throws IOException; + + /** + * Used by repeated converters for writing Parquet logical lists. + * + * @throws IOException may be thrown by subsequent invocation of {{@link #writeField()}} + * in overriden methods + * @see <a href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists">Lists</a> + */ + public void writeListField() throws IOException { + throw new UnsupportedOperationException(String.format( + "Converter '%s' doesn't support writing list fields.", + getClass().getSimpleName())); + } } public static FieldConverter getConverter(RecordWriter recordWriter, int fieldId, String fieldName, FieldReader reader) { diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java index ff80701..1da206d 100644 --- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java @@ -71,6 +71,10 @@ import java.util.Map; */ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter implements RecordWriter { + protected static final String LIST = "list"; + protected static final String ELEMENT = "element"; + protected static final int ZERO_IDX = 0; + private RecordConsumer consumer; private MessageType schema; @@ -206,9 +210,9 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp holder.buffer, holder.start, ${minor.class}Holder.nDecimalDigits, holder.scale).unscaledValue().toByteArray(); byte[] output = new byte[ParquetTypeHelper.getLengthForMinorType(MinorType.${minor.class?upper_case})]; if (holder.getSign(holder.start, holder.buffer)) { - Arrays.fill(output, 0, output.length - bytes.length, (byte)0xFF); + Arrays.fill(output, 0, output.length - bytes.length, (byte) -1); } else { - Arrays.fill(output, 0, output.length - bytes.length, (byte)0x0); + Arrays.fill(output, 0, output.length - bytes.length, (byte) 0); } System.arraycopy(bytes, 0, output, output.length - bytes.length, bytes.length); consumer.addBinary(Binary.fromByteArray(output)); @@ -268,10 +272,109 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp consumer.endField(fieldName, fieldId); </#if> } + + <#if mode.prefix == "Repeated"> + @Override + public void writeListField() { + if (reader.size() == 0) { + return; + } + consumer.startField(LIST, ZERO_IDX); + for (int i = 0; i < reader.size(); i++) { + consumer.startGroup(); + consumer.startField(ELEMENT, ZERO_IDX); + + <#if minor.class == "TinyInt" || + minor.class == "UInt1" || + minor.class == "UInt2" || + minor.class == "SmallInt" || + minor.class == "Int" || + minor.class == "Time" || + minor.class == "Decimal9" || + minor.class == "UInt4"> + reader.read(i, holder); + consumer.addInteger(holder.value); + <#elseif minor.class == "Float4"> + reader.read(i, holder); + consumer.addFloat(holder.value); + <#elseif minor.class == "BigInt" || + minor.class == "Decimal18" || + minor.class == "TimeStamp" || + minor.class == "UInt8"> + reader.read(i, holder); + consumer.addLong(holder.value); + <#elseif minor.class == "Date"> + reader.read(i, holder); + consumer.addInteger((int) (holder.value / DateTimeConstants.MILLIS_PER_DAY)); + <#elseif minor.class == "Float8"> + reader.read(i, holder); + consumer.addDouble(holder.value); + <#elseif minor.class == "Bit"> + reader.read(i, holder); + consumer.addBoolean(holder.value == 1); + <#elseif minor.class == "Decimal28Sparse" || + minor.class == "Decimal38Sparse"> + <#if mode.prefix == "Repeated" > + <#else> + consumer.startField(fieldName, fieldId); + reader.read(holder); + byte[] bytes = DecimalUtility.getBigDecimalFromSparse( + holder.buffer, holder.start, ${minor.class}Holder.nDecimalDigits, holder.scale).unscaledValue().toByteArray(); + byte[] output = new byte[ParquetTypeHelper.getLengthForMinorType(MinorType.${minor.class?upper_case})]; + if (holder.getSign(holder.start, holder.buffer)) { + Arrays.fill(output, 0, output.length - bytes.length, (byte) -1); + } else { + Arrays.fill(output, 0, output.length - bytes.length, (byte) 0); + } + System.arraycopy(bytes, 0, output, output.length - bytes.length, bytes.length); + consumer.addBinary(Binary.fromByteArray(output)); + consumer.endField(fieldName, fieldId); + </#if> + <#elseif minor.class?contains("Interval")> + consumer.startField(fieldName, fieldId); + reader.read(holder); + <#if minor.class == "IntervalDay"> + Arrays.fill(output, 0, 4, (byte) 0); + IntervalUtility.intToLEByteArray(holder.days, output, 4); + IntervalUtility.intToLEByteArray(holder.milliseconds, output, 8); + <#elseif minor.class == "IntervalYear"> + IntervalUtility.intToLEByteArray(holder.value, output, 0); + Arrays.fill(output, 4, 8, (byte) 0); + Arrays.fill(output, 8, 12, (byte) 0); + <#elseif minor.class == "Interval"> + IntervalUtility.intToLEByteArray(holder.months, output, 0); + IntervalUtility.intToLEByteArray(holder.days, output, 4); + IntervalUtility.intToLEByteArray(holder.milliseconds, output, 8); + </#if> + consumer.addBinary(Binary.fromByteArray(output)); + consumer.endField(fieldName, fieldId); + + <#elseif + minor.class == "TimeTZ" || + minor.class == "Decimal28Dense" || + minor.class == "Decimal38Dense"> + <#elseif minor.class == "VarChar" || minor.class == "Var16Char" + || minor.class == "VarBinary" || minor.class == "VarDecimal"> + reader.read(i, holder); + <#if minor.class == "VarDecimal"> + decimalValueWriter.writeValue(consumer, holder.buffer, holder.start, holder.end, + reader.getField().getPrecision()); + <#else> + consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start))); + </#if> + </#if> + + consumer.endField(ELEMENT, ZERO_IDX); + consumer.endGroup(); + } + consumer.endField(LIST, ZERO_IDX); + } + </#if> } </#list> </#list> </#list> + private static class IntervalUtility { private static void intToLEByteArray(final int value, final byte[] output, final int outputIndex) { int shiftOrder = 0; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index a9f7f14..999fdcf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -25,6 +25,8 @@ import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.SchemaPath; @@ -49,6 +51,7 @@ import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; import org.apache.drill.exec.store.ParquetOutputRecordWriter; import org.apache.drill.exec.util.DecimalUtility; import org.apache.drill.exec.vector.BitVector; +import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector; import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -76,6 +79,7 @@ import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Type.Repetition; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.parquet.schema.Types.ListBuilder; public class ParquetRecordWriter extends ParquetOutputRecordWriter { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordWriter.class); @@ -289,22 +293,92 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { DataMode dataMode = field.getType().getMode(); switch (minorType) { case MAP: - List<Type> types = Lists.newArrayList(); - for (MaterializedField childField : field.getChildren()) { - types.add(getType(childField)); - } + List<Type> types = getChildrenTypes(field); return new GroupType(dataMode == DataMode.REPEATED ? Repetition.REPEATED : Repetition.OPTIONAL, field.getName(), types); case LIST: - throw new UnsupportedOperationException("Unsupported type " + minorType); + MaterializedField elementField = getDataField(field); + ListBuilder<GroupType> listBuilder = org.apache.parquet.schema.Types + .list(dataMode == DataMode.OPTIONAL ? Repetition.OPTIONAL : Repetition.REQUIRED); + addElementType(listBuilder, elementField); + GroupType listType = listBuilder.named(field.getName()); + return listType; case NULL: MaterializedField newField = field.withType( - TypeProtos.MajorType.newBuilder().setMinorType(MinorType.INT).setMode(DataMode.OPTIONAL).build()); + TypeProtos.MajorType.newBuilder().setMinorType(MinorType.INT).setMode(DataMode.OPTIONAL).build()); return getPrimitiveType(newField); default: return getPrimitiveType(field); } } + /** + * Helper method for conversion of map child + * fields. + * + * @param field map + * @return converted child fields + */ + private List<Type> getChildrenTypes(MaterializedField field) { + return field.getChildren().stream() + .map(this::getType) + .collect(Collectors.toList()); + } + + /** + * For list or repeated type possible child fields are {@link BaseRepeatedValueVector#DATA_VECTOR_NAME} + * and {@link BaseRepeatedValueVector#OFFSETS_VECTOR_NAME}. This method used to find the data field. + * + * @param field parent repeated field + * @return child data field + */ + private MaterializedField getDataField(MaterializedField field) { + return field.getChildren().stream() + .filter(child -> BaseRepeatedValueVector.DATA_VECTOR_NAME.equals(child.getName())) + .findAny() + .orElseThrow(() -> new NoSuchElementException(String.format( + "Failed to get elementField '%s' from list: %s", + BaseRepeatedValueVector.DATA_VECTOR_NAME, field.getChildren()))); + } + + /** + * Adds element type to {@code listBuilder} based on Drill's + * {@code elementField}. + * + * @param listBuilder list schema builder + * @param elementField Drill's type of list elements + */ + private void addElementType(ListBuilder<GroupType> listBuilder, MaterializedField elementField) { + if (elementField.getDataMode() == DataMode.REPEATED) { + ListBuilder<GroupType> inner = org.apache.parquet.schema.Types.requiredList(); + if (elementField.getType().getMinorType() == MinorType.MAP) { + GroupType mapGroupType = new GroupType(Repetition.REQUIRED, ELEMENT, getChildrenTypes(elementField)); + inner.element(mapGroupType); + } else { + MaterializedField child2 = getDataField(elementField); + addElementType(inner, child2); + } + listBuilder.setElementType(inner.named(ELEMENT)); + } else { + Type element = getType(elementField); + // element may have internal name '$data$', + // rename it to 'element' according to Parquet list schema + if (element.isPrimitive()) { + PrimitiveType primitiveElement = element.asPrimitiveType(); + element = new PrimitiveType( + primitiveElement.getRepetition(), + primitiveElement.getPrimitiveTypeName(), + ELEMENT, + primitiveElement.getOriginalType() + ); + } else { + GroupType groupElement = element.asGroupType(); + element = new GroupType(groupElement.getRepetition(), + ELEMENT, groupElement.getFields()); + } + listBuilder.element(element); + } + } + @Override public void checkForNewPartition(int index) { if (!hasPartitions) { @@ -423,8 +497,58 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { } consumer.endField(fieldName, fieldId); } + + @Override + public void writeListField() throws IOException { + if (reader.size() == 0) { + return; + } + consumer.startField(LIST, ZERO_IDX); + while (reader.next()) { + consumer.startGroup(); + consumer.startField(ELEMENT, ZERO_IDX); + + consumer.startGroup(); + for (FieldConverter converter : converters) { + converter.writeField(); + } + consumer.endGroup(); + + consumer.endField(ELEMENT, ZERO_IDX); + consumer.endGroup(); + } + consumer.endField(LIST, ZERO_IDX); + } + } + + @Override + public FieldConverter getNewRepeatedListConverter(int fieldId, String fieldName, FieldReader reader) { + return new RepeatedListParquetConverter(fieldId, fieldName, reader); } + public class RepeatedListParquetConverter extends FieldConverter { + private final FieldConverter converter; + + RepeatedListParquetConverter(int fieldId, String fieldName, FieldReader reader) { + super(fieldId, fieldName, reader); + converter = EventBasedRecordWriter.getConverter(ParquetRecordWriter.this, 0, "", reader.reader()); + } + + @Override + public void writeField() throws IOException { + consumer.startField(fieldName, fieldId); + consumer.startField(LIST, ZERO_IDX); + while (reader.next()) { + consumer.startGroup(); + consumer.startField(ELEMENT, ZERO_IDX); + converter.writeListField(); + consumer.endField(ELEMENT, ZERO_IDX); + consumer.endGroup(); + } + consumer.endField(LIST, ZERO_IDX); + consumer.endField(fieldName, fieldId); + } + } @Override public void startRecord() throws IOException { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index 70fff7d..5fa618f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -17,29 +17,6 @@ */ package org.apache.drill.exec.physical.impl.writer; -import static org.apache.drill.exec.store.parquet.ParquetRecordWriter.DRILL_VERSION_PROPERTY; -import static org.apache.drill.test.TestBuilder.convertToLocalDateTime; -import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; -import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.file.Paths; -import java.time.LocalDate; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.calcite.util.Pair; import org.apache.drill.categories.ParquetTest; import org.apache.drill.categories.SlowTest; @@ -49,6 +26,8 @@ import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.fn.interp.TestConstantFolding; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.util.JsonStringArrayList; +import org.apache.drill.shaded.guava.com.google.common.base.Joiner; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; import org.apache.drill.test.BaseTestQuery; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -68,8 +47,31 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.apache.drill.shaded.guava.com.google.common.base.Joiner; -import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.file.Paths; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Arrays.asList; +import static org.apache.drill.exec.store.parquet.ParquetRecordWriter.DRILL_VERSION_PROPERTY; +import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_TMP_SCHEMA; +import static org.apache.drill.test.TestBuilder.convertToLocalDateTime; +import static org.apache.drill.test.TestBuilder.mapOf; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) @Category({SlowTest.class, ParquetTest.class}) @@ -1272,6 +1274,62 @@ public class TestParquetWriter extends BaseTestQuery { } } + @Test + public void testCtasForList() throws Exception { + String tableName = "testCtasForList"; + try { + test("CREATE TABLE `%s`.`%s` AS SELECT l FROM cp.`jsoninput/input2.json`", DFS_TMP_SCHEMA, tableName); + testBuilder() + .sqlQuery("SELECT * FROM `%s`.`/%s` LIMIT 1", DFS_TMP_SCHEMA, tableName) + .unOrdered() + .baselineColumns("l") + .baselineValues(asList(4L, 2L)) + .go(); + } finally { + test("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName); + } + } + + @Test + public void testCtasForRepeatedList() throws Exception { + String tableName = "testCtasForRepeatedList"; + try { + test("CREATE TABLE `%s`.`%s` AS SELECT * FROM cp.`jsoninput/repeated_list_bug.json`", DFS_TMP_SCHEMA, tableName); + testBuilder() + .sqlQuery("SELECT rl FROM `%s`.`/%s`", DFS_TMP_SCHEMA, tableName) + .unOrdered() + .baselineColumns("rl") + .baselineValues(asList(asList(4L, 6L), asList(2L, 3L))) + .baselineValues(asList(asList(9L, 7L), asList(4L, 8L))) + .go(); + } finally { + test("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName); + } + } + + @Test + public void testCtasForRepeatedListOfMaps() throws Exception { + String tableName = "testCtasForRepeatedListOfMaps"; + try { + test("CREATE TABLE `%s`.`%s` AS SELECT * FROM cp.`jsoninput/repeated_list_of_maps.json`", DFS_TMP_SCHEMA, tableName); + testBuilder() + .sqlQuery("SELECT * FROM `%s`.`/%s`", DFS_TMP_SCHEMA, tableName) + .unOrdered() + .baselineColumns("rma") + .baselineValues(asList( + asList(mapOf("a", 1L, "b", "2"), mapOf("a", 2L, "b", "3")), + asList(mapOf("a", 3L, "b", "3")) + )) + .baselineValues(asList( + asList(mapOf("a", 4L, "b", "4"), mapOf("a", 5L, "b", "5"), mapOf("a", 6L, "b", "6")), + asList(mapOf("a", 7L, "b", "7")) + )) + .go(); + } finally { + test("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName); + } + } + /** * Checks that specified parquet table contains specified columns with specified types. * diff --git a/exec/java-exec/src/test/resources/jsoninput/repeated_list_of_maps.json b/exec/java-exec/src/test/resources/jsoninput/repeated_list_of_maps.json new file mode 100644 index 0000000..842468f --- /dev/null +++ b/exec/java-exec/src/test/resources/jsoninput/repeated_list_of_maps.json @@ -0,0 +1,2 @@ +{ "rma": [[{"a": 1, "b": "2"},{"a": 2, "b": "3"}], [{"a": 3, "b": "3"}]]} +{ "rma": [[{"a": 4, "b": "4"},{"a": 5, "b": "5"},{"a": 6, "b": "6"}], [{"a": 7, "b": "7"}]]} \ No newline at end of file