This is an automated email from the ASF dual-hosted git repository. bohdan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 65fb7dd DRILL-7509: Incorrect TupleSchema is created for DICT column when querying Parquet files 65fb7dd is described below commit 65fb7ddc144ecae5330c9325af63010748f74cdf Author: Bohdan Kazydub <bohdan.kazy...@gmail.com> AuthorDate: Mon Jan 13 18:58:29 2020 +0200 DRILL-7509: Incorrect TupleSchema is created for DICT column when querying Parquet files --- .../exec/store/parquet/FilterEvaluatorUtils.java | 20 ++++- .../exec/store/parquet/ParquetReaderUtility.java | 57 +++++++++---- .../store/parquet/ParquetTableMetadataUtils.java | 63 ++++++++++---- .../parquet/metadata/FileMetadataCollector.java | 95 ++++++++++++++++----- .../exec/store/parquet/metadata/Metadata.java | 4 +- .../exec/store/parquet/metadata/MetadataBase.java | 10 +++ .../store/parquet/metadata/MetadataVersion.java | 47 +++++++++- .../exec/store/parquet/metadata/Metadata_V4.java | 19 ++++- .../exec/store/parquet/TestParquetComplex.java | 11 +++ .../store/parquet/TestParquetMetadataVersion.java | 37 ++++++++ .../store/parquet/complex/repeated_struct.parquet | Bin 0 -> 608 bytes .../record/metadata/AbstractColumnMetadata.java | 2 +- .../record/metadata/AbstractMapColumnMetadata.java | 10 +-- .../exec/record/metadata/DictColumnMetadata.java | 21 ++++- .../drill/exec/record/metadata/MetadataUtils.java | 8 +- .../drill/metastore/util/SchemaPathUtils.java | 95 +++++++++++++++++++-- 16 files changed, 421 insertions(+), 78 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java index ffde9c3..80acade 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java @@ -113,7 +113,16 @@ public class FilterEvaluatorUtils { StatisticsProvider<T> rangeExprEvaluator = new StatisticsProvider(columnsStatistics, rowCount); rowsMatch = parquetPredicate.matches(rangeExprEvaluator); } - return rowsMatch == RowsMatch.ALL && isRepeated(schemaPathsInExpr, fileMetadata) ? RowsMatch.SOME : rowsMatch; + + if (rowsMatch == RowsMatch.ALL && isMetaNotApplicable(schemaPathsInExpr, fileMetadata)) { + rowsMatch = RowsMatch.SOME; + } + + return rowsMatch; + } + + private static boolean isMetaNotApplicable(Set<SchemaPath> schemaPathsInExpr, TupleMetadata fileMetadata) { + return isRepeated(schemaPathsInExpr, fileMetadata) || isDictOrRepeatedMapChild(schemaPathsInExpr, fileMetadata); } private static boolean isRepeated(Set<SchemaPath> fields, TupleMetadata fileMetadata) { @@ -127,6 +136,15 @@ public class FilterEvaluatorUtils { return false; } + private static boolean isDictOrRepeatedMapChild(Set<SchemaPath> fields, TupleMetadata fileMetadata) { + for (SchemaPath field : fields) { + if (SchemaPathUtils.isFieldNestedInDictOrRepeatedMap(field, fileMetadata)) { + return true; + } + } + return false; + } + /** * Search through a LogicalExpression, finding all internal schema path references and returning them in a set. */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java index 26021e3..d4267dd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java @@ -238,15 +238,14 @@ public class ParquetReaderUtility { } public static void correctDatesInMetadataCache(ParquetTableMetadataBase parquetTableMetadata) { + MetadataVersion metadataVersion = new MetadataVersion(parquetTableMetadata.getMetadataVersion()); DateCorruptionStatus cacheFileCanContainsCorruptDates = - new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 0)) >= 0 ? + metadataVersion.isAtLeast(3, 0) ? DateCorruptionStatus.META_SHOWS_NO_CORRUPTION : DateCorruptionStatus.META_UNCLEAR_TEST_VALUES; if (cacheFileCanContainsCorruptDates == DateCorruptionStatus.META_UNCLEAR_TEST_VALUES) { - boolean mdVersion_1_0 = new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion())); - boolean mdVersion_2_0 = new MetadataVersion(2, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion())); // Looking for the DATE data type of column names in the metadata cache file ("metadata_version" : "v2") String[] names = new String[0]; - if (mdVersion_2_0) { + if (metadataVersion.isEqualTo(2, 0)) { for (ColumnTypeMetadata_v2 columnTypeMetadata : ((ParquetTableMetadata_v2) parquetTableMetadata).columnTypeInfo.values()) { if (OriginalType.DATE.equals(columnTypeMetadata.originalType)) { @@ -261,7 +260,7 @@ public class ParquetReaderUtility { Long rowCount = rowGroupMetadata.getRowCount(); for (ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) { // Setting Min/Max values for ParquetTableMetadata_v1 - if (mdVersion_1_0) { + if (metadataVersion.isEqualTo(1, 0)) { OriginalType originalType = columnMetadata.getOriginalType(); if (OriginalType.DATE.equals(originalType) && columnMetadata.hasSingleValue(rowCount) && (Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) { @@ -271,7 +270,7 @@ public class ParquetReaderUtility { } } // Setting Max values for ParquetTableMetadata_v2 - else if (mdVersion_2_0 && + else if (metadataVersion.isEqualTo(2, 0) && columnMetadata.getName() != null && Arrays.equals(columnMetadata.getName(), names) && columnMetadata.hasSingleValue(rowCount) && @@ -299,7 +298,8 @@ public class ParquetReaderUtility { boolean allowBinaryMetadata = allowBinaryMetadata(parquetTableMetadata.getDrillVersion(), readerConfig); // Setting Min / Max values for ParquetTableMetadata_v1 - if (new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) { + MetadataVersion metadataVersion = new MetadataVersion(parquetTableMetadata.getMetadataVersion()); + if (metadataVersion.isEqualTo(1, 0)) { for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) { for (RowGroupMetadata rowGroupMetadata : file.getRowGroups()) { Long rowCount = rowGroupMetadata.getRowCount(); @@ -320,7 +320,7 @@ public class ParquetReaderUtility { int maxNumColumns = 0; // Setting Min / Max values for V2, V3 and V4 versions; for versions V3_3 and above need to do decoding - boolean needDecoding = new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 3)) >= 0; + boolean needDecoding = metadataVersion.isAtLeast(3, 3); for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) { if ( timer != null ) { // for debugging only maxRowGroups = Math.max(maxRowGroups, file.getRowGroups().size()); @@ -718,15 +718,17 @@ public class ParquetReaderUtility { /** * Converts list of {@link OriginalType}s to list of {@link org.apache.drill.common.types.TypeProtos.MajorType}s. - * <b>NOTE</b>: current implementation cares about {@link OriginalType#MAP} only - * converting it to {@link org.apache.drill.common.types.TypeProtos.MinorType#DICT}. + * <b>NOTE</b>: current implementation cares about {@link OriginalType#MAP} and {@link OriginalType#LIST} only + * converting it to {@link org.apache.drill.common.types.TypeProtos.MinorType#DICT} + * and {@link org.apache.drill.common.types.TypeProtos.MinorType#LIST} respectively. * Other original types are converted to {@code null}, because there is no certain correspondence - * (and, actually, a need because these types are used to differentiate between Drill's MAP and DICT types + * (and, actually, a need because these types are used to differentiate between Drill's MAP and DICT (and arrays of thereof) types * when constructing {@link org.apache.drill.exec.record.metadata.TupleSchema}) between these two. * * @param originalTypes list of Parquet's types * @return list containing either {@code null} or type with minor - * type {@link org.apache.drill.common.types.TypeProtos.MinorType#DICT} values + * type {@link org.apache.drill.common.types.TypeProtos.MinorType#DICT} or + * {@link org.apache.drill.common.types.TypeProtos.MinorType#LIST} values */ public static List<TypeProtos.MajorType> getComplexTypes(List<OriginalType> originalTypes) { List<TypeProtos.MajorType> result = new ArrayList<>(); @@ -735,11 +737,9 @@ public class ParquetReaderUtility { } for (OriginalType type : originalTypes) { if (type == OriginalType.MAP) { - TypeProtos.MajorType drillType = TypeProtos.MajorType.newBuilder() - .setMinorType(TypeProtos.MinorType.DICT) - .setMode(TypeProtos.DataMode.OPTIONAL) - .build(); - result.add(drillType); + result.add(Types.required(TypeProtos.MinorType.DICT)); + } else if (type == OriginalType.LIST) { + result.add(Types.required(TypeProtos.MinorType.LIST)); } else { result.add(null); } @@ -807,4 +807,27 @@ public class ParquetReaderUtility { } return false; } + + /** + * Converts Parquet's {@link Type.Repetition} to Drill's {@link TypeProtos.DataMode}. + * @param repetition repetition to be converted + * @return data mode corresponding to Parquet's repetition + */ + public static TypeProtos.DataMode getDataMode(Type.Repetition repetition) { + TypeProtos.DataMode mode; + switch (repetition) { + case REPEATED: + mode = TypeProtos.DataMode.REPEATED; + break; + case OPTIONAL: + mode = TypeProtos.DataMode.OPTIONAL; + break; + case REQUIRED: + mode = TypeProtos.DataMode.REQUIRED; + break; + default: + throw new IllegalArgumentException(String.format("Unknown Repetition: %s.", repetition)); + } + return mode; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java index 68254d3..0bad959 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java @@ -472,7 +472,7 @@ public class ParquetTableMetadataUtils { public static Map<SchemaPath, TypeProtos.MajorType> getRowGroupFields( MetadataBase.ParquetTableMetadataBase parquetTableMetadata, MetadataBase.RowGroupMetadata rowGroup) { Map<SchemaPath, TypeProtos.MajorType> columns = new LinkedHashMap<>(); - if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(4, 0)) > 0 + if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).isHigherThan(4, 0) && !((Metadata_V4.ParquetTableMetadata_v4) parquetTableMetadata).isAllColumnsInteresting()) { // adds non-interesting fields from table metadata for (MetadataBase.ColumnTypeMetadata columnTypeMetadata : parquetTableMetadata.getColumnTypeInfoList()) { @@ -507,27 +507,61 @@ public class ParquetTableMetadataUtils { MetadataBase.ParquetTableMetadataBase parquetTableMetadata) { int precision = 0; int scale = 0; - int definitionLevel = 1; - int repetitionLevel = 0; MetadataVersion metadataVersion = new MetadataVersion(parquetTableMetadata.getMetadataVersion()); // only ColumnTypeMetadata_v3 and ColumnTypeMetadata_v4 store information about scale, precision, repetition level and definition level - if (parquetTableMetadata.hasColumnMetadata() && (metadataVersion.compareTo(new MetadataVersion(3, 0)) >= 0)) { + if (metadataVersion.isAtLeast(3, 0)) { scale = parquetTableMetadata.getScale(name); precision = parquetTableMetadata.getPrecision(name); - repetitionLevel = parquetTableMetadata.getRepetitionLevel(name); - definitionLevel = parquetTableMetadata.getDefinitionLevel(name); } + + TypeProtos.DataMode mode = getDataMode(parquetTableMetadata, metadataVersion, name); + return TypeProtos.MajorType.newBuilder(ParquetReaderUtility.getType(primitiveType, originalType, precision, scale)) + .setMode(mode) + .build(); + } + + /** + * Obtain data mode from table metadata for a column. Algorithm for retrieving data mode depends on metadata version: + * <ul> + * <li>starting from version {@code 4.2}, Parquet's {@link org.apache.parquet.schema.Type.Repetition} + * is stored in table metadata itself;</li> + * <li>starting from {@code 3.0} to {@code 4.2} (exclusively) the data mode is + * computed based on max {@code definition} and {@code repetition} levels + * ({@link MetadataBase.ParquetTableMetadataBase#getDefinitionLevel(String[])} and + * {@link MetadataBase.ParquetTableMetadataBase#getRepetitionLevel(String[])} respectively) + * obtained from Parquet's schema; + * + * <p><strong>Note:</strong> this computation may lead to erroneous results, + * when there are few nesting levels.</p> + * </li> + * <li>prior to {@code 3.0} {@code DataMode.OPTIONAL} is returned.</li> + * </ul> + * @param tableMetadata Parquet table metadata + * @param metadataVersion version of Parquet table metadata + * @param name (leaf) column to obtain data mode for + * @return data mode of the specified column + */ + private static TypeProtos.DataMode getDataMode(MetadataBase.ParquetTableMetadataBase tableMetadata, + MetadataVersion metadataVersion, String[] name) { TypeProtos.DataMode mode; - if (repetitionLevel >= 1) { - mode = TypeProtos.DataMode.REPEATED; - } else if (repetitionLevel == 0 && definitionLevel == 0) { - mode = TypeProtos.DataMode.REQUIRED; + if (metadataVersion.isAtLeast(4, 2)) { + mode = ParquetReaderUtility.getDataMode(tableMetadata.getRepetition(name)); + } else if (metadataVersion.isAtLeast(3, 0)) { + int definitionLevel = tableMetadata.getDefinitionLevel(name); + int repetitionLevel = tableMetadata.getRepetitionLevel(name); + + if (repetitionLevel >= 1) { + mode = TypeProtos.DataMode.REPEATED; + } else if (repetitionLevel == 0 && definitionLevel == 0) { + mode = TypeProtos.DataMode.REQUIRED; + } else { + mode = TypeProtos.DataMode.OPTIONAL; + } } else { mode = TypeProtos.DataMode.OPTIONAL; } - return TypeProtos.MajorType.newBuilder(ParquetReaderUtility.getType(primitiveType, originalType, precision, scale)) - .setMode(mode) - .build(); + + return mode; } /** @@ -547,8 +581,7 @@ public class ParquetTableMetadataUtils { Map<SchemaPath, TypeProtos.MajorType> columns = new LinkedHashMap<>(); MetadataVersion metadataVersion = new MetadataVersion(parquetTableMetadata.getMetadataVersion()); - boolean hasParentTypes = parquetTableMetadata.hasColumnMetadata() - && metadataVersion.compareTo(new MetadataVersion(4, 1)) >= 0; + boolean hasParentTypes = metadataVersion.isAtLeast(4, 1); if (!hasParentTypes) { return Collections.emptyMap(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java index 9b74c6d..c79996b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java @@ -185,6 +185,7 @@ public class FileMetadataCollector { .totalNullCount(0) .interesting(false) .parentTypes(colTypeInfo.parentTypes) + .repetition(colTypeInfo.repetition) .build(); Metadata_V4.ColumnTypeMetadata_v4.Key columnTypeMetadataKey = new Metadata_V4.ColumnTypeMetadata_v4.Key(columnTypeMetadata.name); @@ -254,41 +255,91 @@ public class FileMetadataCollector { int scale; int repetitionLevel; int definitionLevel; - - ColTypeInfo(OriginalType originalType, List<OriginalType> parentTypes, - int precision, int scale, int repetitionLevel, int definitionLevel) { - this.originalType = originalType; - this.parentTypes = parentTypes; - this.precision = precision; - this.scale = scale; - this.repetitionLevel = repetitionLevel; - this.definitionLevel = definitionLevel; - } + Type.Repetition repetition; static ColTypeInfo of(MessageType schema, Type type, String[] path, int depth, List<OriginalType> parentTypes) { if (type.isPrimitive()) { - PrimitiveType primitiveType = (PrimitiveType) type; - int precision = 0; - int scale = 0; - if (primitiveType.getDecimalMetadata() != null) { - precision = primitiveType.getDecimalMetadata().getPrecision(); - scale = primitiveType.getDecimalMetadata().getScale(); - } - - int repetitionLevel = schema.getMaxRepetitionLevel(path); - int definitionLevel = schema.getMaxDefinitionLevel(path); - - return new ColTypeInfo(type.getOriginalType(), parentTypes, precision, scale, repetitionLevel, definitionLevel); + return createColTypeInfo(type.asPrimitiveType(), schema, path, parentTypes); } + Type t = ((GroupType) type).getType(path[depth]); if (!t.isPrimitive()) { OriginalType originalType = t.getOriginalType(); if (originalType == OriginalType.MAP && !ParquetReaderUtility.isLogicalMapType(t.asGroupType())) { originalType = null; + } else if (originalType == OriginalType.LIST && !ParquetReaderUtility.isLogicalListType(t.asGroupType())) { + originalType = null; } parentTypes.add(originalType); } return of(schema, t, path, depth + 1, parentTypes); } + + private static ColTypeInfo createColTypeInfo(PrimitiveType type, MessageType schema, + String[] path, List<OriginalType> parentTypes) { + int precision = 0; + int scale = 0; + if (type.getDecimalMetadata() != null) { + precision = type.getDecimalMetadata().getPrecision(); + scale = type.getDecimalMetadata().getScale(); + } + + int repetitionLevel = schema.getMaxRepetitionLevel(path); + int definitionLevel = schema.getMaxDefinitionLevel(path); + + Type.Repetition repetition; + // Check if the primitive has LIST as parent, if it does - this is an array of primitives. + // (See ParquetReaderUtility#isLogicalListType(GroupType) for the REPEATED field structure.) + int probableListIndex = parentTypes.size() - 2; + if (probableListIndex >= 0 && parentTypes.get(probableListIndex) == OriginalType.LIST) { + repetition = Type.Repetition.REPEATED; + } else { + repetition = type.getRepetition(); + } + + return new ColTypeInfo() + .setOriginalType(type.getOriginalType()) + .setParentTypes(parentTypes) + .setPrecision(precision) + .setScale(scale) + .setRepetitionLevel(repetitionLevel) + .setDefinitionLevel(definitionLevel) + .setRepetition(repetition); + } + + private ColTypeInfo setOriginalType(OriginalType originalType) { + this.originalType = originalType; + return this; + } + + private ColTypeInfo setParentTypes(List<OriginalType> parentTypes) { + this.parentTypes = parentTypes; + return this; + } + + private ColTypeInfo setPrecision(int precision) { + this.precision = precision; + return this; + } + + private ColTypeInfo setScale(int scale) { + this.scale = scale; + return this; + } + + private ColTypeInfo setRepetitionLevel(int repetitionLevel) { + this.repetitionLevel = repetitionLevel; + return this; + } + + private ColTypeInfo setDefinitionLevel(int definitionLevel) { + this.definitionLevel = definitionLevel; + return this; + } + + private ColTypeInfo setRepetition(Type.Repetition repetition) { + this.repetition = repetition; + return this; + } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java index 03d1fd0..92a1adb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java @@ -592,7 +592,7 @@ public class Metadata { } else { if (isFileMetadata) { parquetTableMetadata.assignFiles((mapper.readValue(is, FileMetadata.class)).getFiles()); - if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(4, 0)) >= 0) { + if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).isAtLeast(4, 0)) { ((ParquetTableMetadata_v4) parquetTableMetadata).updateRelativePaths(metadataParentDirPath); } @@ -606,7 +606,7 @@ public class Metadata { parquetTableMetadata = new ParquetTableMetadata_v4(metadataSummary); } else { parquetTableMetadata = mapper.readValue(is, ParquetTableMetadataBase.class); - if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 0)) >= 0) { + if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).isAtLeast(3, 0)) { ((Metadata_V3.ParquetTableMetadata_v3) parquetTableMetadata).updateRelativePaths(metadataParentDirPath); } if (!alreadyCheckedModification && tableModified((parquetTableMetadata.getDirectories()), path, metadataParentDir, metaContext, fs)) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java index 126aa6d..14b2703 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.apache.hadoop.fs.Path; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; import java.util.List; import java.util.Map; @@ -34,6 +35,8 @@ import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Const import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V3_2; import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V3_3; import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V4; +import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V4_1; +import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V4_2; public class MetadataBase { @@ -56,6 +59,8 @@ public class MetadataBase { @JsonSubTypes.Type(value = Metadata_V3.ParquetTableMetadata_v3.class, name = V3_2), @JsonSubTypes.Type(value = Metadata_V3.ParquetTableMetadata_v3.class, name = V3_3), @JsonSubTypes.Type(value = Metadata_V4.ParquetTableMetadata_v4.class, name = V4), + @JsonSubTypes.Type(value = Metadata_V4.ParquetTableMetadata_v4.class, name = V4_1), + @JsonSubTypes.Type(value = Metadata_V4.ParquetTableMetadata_v4.class, name = V4_2), }) public static abstract class ParquetTableMetadataBase { @@ -90,6 +95,11 @@ public class MetadataBase { @JsonIgnore public abstract String getMetadataVersion(); @JsonIgnore public abstract List<? extends ColumnTypeMetadata> getColumnTypeInfoList(); + + @JsonIgnore + public Type.Repetition getRepetition(String[] columnName) { + return null; + } } public static abstract class ParquetFileMetadata { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataVersion.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataVersion.java index 46e4c57..1ffe59d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataVersion.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataVersion.java @@ -103,6 +103,45 @@ public class MetadataVersion implements Comparable<MetadataVersion> { .result(); } + /** + * Check if this version is at least (equals or higher) the one + * identified by {@code major} and {@code minor} versions integer literals. + * + * @param major major version + * @param minor minor version + * @return {@literal true} if the version is equal to or higher than + * the one it is being checked against + */ + public boolean isAtLeast(int major, int minor) { + return this.major > major || (this.major == major && this.minor >= minor); + } + + /** + * Check if the version is the same as the one identified by + * {@code major} and {@code minor} versions integer literals. + * + * @param major major version + * @param minor minor version + * @return {@literal true} if the version is equal to the one + * it is being checked against + */ + public boolean isEqualTo(int major, int minor) { + return this.major == major && this.minor == minor; + } + + /** + * Check if this version comes after the one identified by {@code major} + * and {@code minor} versions integer literals. That is, this one was introduced later. + * + * @param major major version + * @param minor minor version + * @return {@literal true} if the version is defined later than + * the one it is being checked against + */ + public boolean isHigherThan(int major, int minor) { + return this.major > major || (this.major == major && this.minor > minor); + } + /** * Supported metadata versions. * <p> @@ -157,6 +196,11 @@ public class MetadataVersion implements Comparable<MetadataVersion> { public static final String V4_1 = "4.1"; /** + * Version 4.2: Added {@link org.apache.parquet.schema.Type.Repetition} to {@link Metadata_V4.ColumnTypeMetadata_v4}. + */ + public static final String V4_2 = "4.2"; + + /** * All historical versions of the Drill metadata cache files. In case of introducing a new parquet metadata version * please follow the {@link MetadataVersion#FORMAT}. */ @@ -168,7 +212,8 @@ public class MetadataVersion implements Comparable<MetadataVersion> { new MetadataVersion(V3_2), new MetadataVersion(V3_3), new MetadataVersion(V4), - new MetadataVersion(V4_1) + new MetadataVersion(V4_1), + new MetadataVersion(V4_2) ); /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java index d17cd31..12d8f30 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java @@ -26,6 +26,7 @@ import org.apache.drill.common.util.DrillVersionInfo; import org.apache.hadoop.fs.Path; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; import java.util.ArrayList; import java.util.Collections; @@ -38,7 +39,7 @@ import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnTy import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata; import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase; import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata; -import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V4; +import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V4_2; public class Metadata_V4 { @@ -158,6 +159,11 @@ public class Metadata_V4 { return metadataSummary.drillVersion; } + @Override + public Type.Repetition getRepetition(String[] columnName) { + return getColumnTypeInfo(columnName).repetition; + } + public MetadataSummary getSummary() { return metadataSummary; } @@ -309,6 +315,8 @@ public class Metadata_V4 { public long totalNullCount = 0; @JsonProperty public boolean isInteresting = false; + @JsonProperty + public Type.Repetition repetition; // Key to find by name only @JsonIgnore @@ -329,6 +337,7 @@ public class Metadata_V4 { this.totalNullCount = builder.totalNullCount; this.isInteresting = builder.isInteresting; this.parentTypes = Collections.unmodifiableList(builder.parentTypes); + this.repetition = builder.repetition; } @JsonIgnore @@ -413,6 +422,7 @@ public class Metadata_V4 { private int definitionLevel; private long totalNullCount; private boolean isInteresting; + private Type.Repetition repetition; public Builder name(String[] name) { this.name = name; @@ -464,6 +474,11 @@ public class Metadata_V4 { return this; } + public Builder repetition(Type.Repetition repetition) { + this.repetition = repetition; + return this; + } + public ColumnTypeMetadata_v4 build() { return new ColumnTypeMetadata_v4(this); } @@ -483,7 +498,7 @@ public class Metadata_V4 { } } - @JsonTypeName(V4) + @JsonTypeName(V4_2) public static class MetadataSummary { @JsonProperty(value = "metadata_version") diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java index 6e02004..b496f33 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java @@ -875,4 +875,15 @@ public class TestParquetComplex extends BaseTestQuery { .baselineValues(3L, 0L, 3L, 3L) .go(); } + + @Test // DRILL-7509 + public void selectRepeatedMapWithFilter() throws Exception { + String query = "select id, struct_array[1].b as b from cp.`store/parquet/complex/repeated_struct.parquet` where struct_array[1].b is null"; + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("id", "b") + .baselineValues(2, null) + .go(); + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataVersion.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataVersion.java index f880126..4ff2802 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataVersion.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataVersion.java @@ -26,6 +26,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @Category({ParquetTest.class, UnlikelyTest.class}) @@ -123,4 +124,40 @@ public class TestParquetMetadataVersion extends BaseTest { throw e; } } + + @Test + public void testAtLeast() { + MetadataVersion version = new MetadataVersion("v4.2"); + assertTrue(version.isAtLeast(4, 0)); + assertTrue(version.isAtLeast(4, 1)); + assertTrue(version.isAtLeast(4, 2)); + assertFalse(version.isAtLeast(4, 3)); + assertFalse(version.isAtLeast(5, 1)); + assertTrue(version.isAtLeast(3, 0)); + assertTrue(version.isAtLeast(1, 0)); + } + + @Test + public void testAfter() { + MetadataVersion version = new MetadataVersion(4, 1); + assertFalse(version.isHigherThan(4,1)); + assertFalse(version.isHigherThan(4,3)); + assertFalse(version.isHigherThan(5,0)); + assertTrue(version.isHigherThan(4, 0)); + assertTrue(version.isHigherThan(3, 0)); + assertTrue(version.isHigherThan(2, 1)); + assertTrue(version.isHigherThan(1, 3)); + assertTrue(version.isHigherThan(1, 0)); + } + + @Test + public void testIsEqual() { + MetadataVersion version = new MetadataVersion(3, 2); + assertTrue(version.isEqualTo(3, 2)); + assertFalse(version.isEqualTo(4, 2)); + assertFalse(version.isEqualTo(2, 3)); + assertFalse(version.isEqualTo(1, 0)); + assertFalse(version.isEqualTo(3, 1)); + assertFalse(version.isEqualTo(1, 2)); + } } diff --git a/exec/java-exec/src/test/resources/store/parquet/complex/repeated_struct.parquet b/exec/java-exec/src/test/resources/store/parquet/complex/repeated_struct.parquet new file mode 100644 index 0000000..c46dc90 Binary files /dev/null and b/exec/java-exec/src/test/resources/store/parquet/complex/repeated_struct.parquet differ diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java index 99bcdb3..0c8a8bf 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java @@ -306,7 +306,7 @@ public abstract class AbstractColumnMetadata extends AbstractPropertied implemen builder.append(typeString()); // Drill does not have nullability notion for complex types - if (!isNullable() && !isArray() && !isMap()) { + if (!isNullable() && !isArray() && !isMap() && !isDict()) { builder.append(" NOT NULL"); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractMapColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractMapColumnMetadata.java index 6f79336..cecfdd9 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractMapColumnMetadata.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractMapColumnMetadata.java @@ -47,21 +47,21 @@ public abstract class AbstractMapColumnMetadata extends AbstractColumnMetadata { * the children) of the materialized field provided. * * @param schema the schema to use - * @param mapSchema parent schema + * @param tupleSchema parent schema */ - AbstractMapColumnMetadata(MaterializedField schema, TupleSchema mapSchema) { + AbstractMapColumnMetadata(MaterializedField schema, TupleSchema tupleSchema) { super(schema); - if (mapSchema == null) { + if (tupleSchema == null) { this.schema = new TupleSchema(); } else { - this.schema = mapSchema; + this.schema = tupleSchema; } this.schema.bind(this); } public AbstractMapColumnMetadata(AbstractMapColumnMetadata from) { super(from); - schema = (TupleSchema) from.schema.copy(); + schema = from.schema.copy(); } public AbstractMapColumnMetadata(String name, MinorType type, DataMode mode, TupleSchema schema) { diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictColumnMetadata.java index 50957e8..66ba63f 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictColumnMetadata.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictColumnMetadata.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.record.metadata; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.vector.complex.DictVector; public class DictColumnMetadata extends AbstractMapColumnMetadata { @@ -31,6 +32,14 @@ public class DictColumnMetadata extends AbstractMapColumnMetadata { this(schema, null); } + public DictColumnMetadata(String name, TypeProtos.DataMode mode) { + this(name, mode, null); + } + + public DictColumnMetadata(DictColumnMetadata from) { + super(from); + } + /** * Build a dict column metadata by cloning the type information (but not * the children) of the materialized field provided. @@ -42,12 +51,16 @@ public class DictColumnMetadata extends AbstractMapColumnMetadata { super(schema, tupleSchema); } - public DictColumnMetadata(DictColumnMetadata from) { - super(from); + DictColumnMetadata(String name, TypeProtos.DataMode mode, TupleSchema tupleSchema) { + super(name, TypeProtos.MinorType.DICT, mode, tupleSchema); + } + + public ColumnMetadata keyColumnMetadata() { + return schema.metadata(DictVector.FIELD_KEY_NAME); } - public DictColumnMetadata(String name, TypeProtos.DataMode mode, TupleSchema mapSchema) { - super(name, TypeProtos.MinorType.DICT, mode, mapSchema); + public ColumnMetadata valueColumnMetadata() { + return schema.metadata(DictVector.FIELD_VALUE_NAME); } @Override diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java index 05936a3..03f304c 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java @@ -160,8 +160,8 @@ public class MetadataUtils { } } - public static DictColumnMetadata newDict(String name, TupleMetadata schema) { - return new DictColumnMetadata(name, DataMode.REQUIRED, (TupleSchema) schema); + public static DictColumnMetadata newDict(String name) { + return new DictColumnMetadata(name, DataMode.REQUIRED); } public static VariantColumnMetadata newVariant(MaterializedField field, VariantSchema schema) { @@ -187,6 +187,10 @@ public class MetadataUtils { return new MapColumnMetadata(name, DataMode.REPEATED, (TupleSchema) schema); } + public static DictColumnMetadata newDictArray(String name) { + return new DictColumnMetadata(name, DataMode.REPEATED); + } + public static PrimitiveColumnMetadata newScalar(String name, MinorType type, DataMode mode) { assert isScalar(type); diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java index 4643c31..a89d435 100644 --- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java +++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java @@ -21,8 +21,10 @@ import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.DictColumnMetadata; import org.apache.drill.exec.record.metadata.MetadataUtils; import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata; import org.apache.drill.exec.record.metadata.TupleMetadata; @@ -49,8 +51,7 @@ public class SchemaPathUtils { ColumnMetadata colMetadata = schema.metadata(colPath.getPath()); while (!colPath.isLastPath() && colMetadata != null) { if (colMetadata.isDict()) { - // get dict's value field metadata - colMetadata = colMetadata.tupleSchema().metadata(0).tupleSchema().metadata(1); + colMetadata = ((DictColumnMetadata) colMetadata).valueColumnMetadata(); break; } if (!colMetadata.isMap()) { @@ -64,6 +65,40 @@ public class SchemaPathUtils { } /** + * Checks if field identified by the schema path is child in either {@code DICT} or {@code REPEATED MAP}. + * For such fields, nested in {@code DICT} or {@code REPEATED MAP}, + * filters can't be removed based on Parquet statistics. + * + * <p>The need for the check arises because statistics data is not obtained for such fields as their representation + * differs from the 'canonical' one. For example, field {@code `a`} in Parquet's {@code STRUCT ARRAY} is represented + * as {@code `struct_array`.`bag`.`array_element`.`a`} but once it is used in a filter, {@code ... WHERE struct_array[0].a = 1}, + * it has different representation (with indexes stripped): {@code `struct_array`.`a`} which is not present in statistics. + * The same happens with DICT's {@code value}: for {@code SELECT ... WHERE dict_col['a'] = 0}, statistics exist for + * {@code `dict_col`.`key_value`.`value`} but the field in filter is translated to {@code `dict_col`.`a`} and hence it is + * considered not present in statistics. If the fields (such as ones shown in examples) are {@code OPTIONAL INT} then + * the field is considered not present in a table and is treated as {@code NULL}. To avoid this situation, the method is used.</p> + * + * @param schemaPath schema path used in filter + * @param schema schema containing all the fields in the file + * @return {@literal true} if field is nested inside {@code DICT} (is {@code `key`} or {@code `value`}) + * or inside {@code REPEATED MAP} field, {@literal false} otherwise. + */ + public static boolean isFieldNestedInDictOrRepeatedMap(SchemaPath schemaPath, TupleMetadata schema) { + PathSegment.NameSegment colPath = schemaPath.getUnIndexed().getRootSegment(); + ColumnMetadata colMetadata = schema.metadata(colPath.getPath()); + while (!colPath.isLastPath() && colMetadata != null) { + if (colMetadata.isDict() || (colMetadata.isMap() && Types.isRepeated(colMetadata.majorType()))) { + return true; + } else if (!colMetadata.isMap()) { + break; + } + colPath = (PathSegment.NameSegment) colPath.getChild(); + colMetadata = colMetadata.tupleSchema().metadata(colPath.getPath()); + } + return false; + } + + /** * Adds column with specified schema path and type into specified {@code TupleMetadata schema}. * For the case when specified {@link SchemaPath} has children, corresponding maps will be created * in the {@code TupleMetadata schema} and the last child of the map will have specified type. @@ -73,23 +108,71 @@ public class SchemaPathUtils { * @param type type of the column which should be added * @param types list of column's parent types */ - public static void addColumnMetadata(TupleMetadata schema, SchemaPath schemaPath, TypeProtos.MajorType type, Map<SchemaPath, TypeProtos.MajorType> types) { + public static void addColumnMetadata(TupleMetadata schema, SchemaPath schemaPath, + TypeProtos.MajorType type, Map<SchemaPath, TypeProtos.MajorType> types) { PathSegment.NameSegment colPath = schemaPath.getUnIndexed().getRootSegment(); List<String> names = new ArrayList<>(types.size()); + // Used in case of LIST; defined here to avoid many instantiations inside while-loop + List<String> nextNames = new ArrayList<>(names.size()); ColumnMetadata colMetadata; while (!colPath.isLastPath()) { names.add(colPath.getPath()); colMetadata = schema.metadata(colPath.getPath()); TypeProtos.MajorType pathType = types.get(SchemaPath.getCompoundPath(names.toArray(new String[0]))); + + // The following types, DICT and LIST, contain a nested segment in Parquet representation + // (see ParquetReaderUtility#isLogicalListType(GroupType) and ParquetReaderUtility#isLogicalMapType(GroupType)) + // which we should skip when creating corresponding TupleMetadata representation. Additionally, + // there is a need to track if the field is LIST to create appropriate column metadata based + // on the info: whether to create singular MAP/DICT or MAP/DICT array. + boolean isDict = pathType != null && pathType.getMinorType() == TypeProtos.MinorType.DICT; + boolean isList = pathType != null && pathType.getMinorType() == TypeProtos.MinorType.LIST; + String name = colPath.getPath(); + + if (isList) { + nextNames.clear(); + nextNames.addAll(names); + + // Parquet's LIST group (which represents an array) has + // an inner group (bagSegment) which we want to skip here + PathSegment.NameSegment bagSegment = colPath.getChild().getNameSegment(); + PathSegment.NameSegment elementSegment = bagSegment.getChild().getNameSegment(); + nextNames.add(bagSegment.getPath()); + nextNames.add(elementSegment.getPath()); + + pathType = types.get(SchemaPath.getCompoundPath(nextNames.toArray(new String[0]))); + + if (pathType == null && colPath.getChild().getChild().isLastPath()) { + // The list is actually a repeated primitive: + // will be handled after the while statement + break; + } + + colPath = elementSegment; + + names.add(bagSegment.getPath()); + names.add(elementSegment.getPath()); + + // Check whether LIST's element type is DICT + isDict = pathType != null && pathType.getMinorType() == TypeProtos.MinorType.DICT; + } + if (colMetadata == null) { - if (pathType != null && pathType.getMinorType() == TypeProtos.MinorType.DICT) { - colMetadata = MetadataUtils.newDict(colPath.getPath(), null); + if (isDict) { + colMetadata = isList ? MetadataUtils.newDictArray(name) : MetadataUtils.newDict(name); } else { - colMetadata = MetadataUtils.newMap(colPath.getPath(), null); + colMetadata = isList ? MetadataUtils.newMapArray(name, null) : MetadataUtils.newMap(name, null); } schema.addColumn(colMetadata); } + if (isDict) { + // Parquet's MAP (which corresponds to DICT in Drill) has + // an inner group which we want to skip here + colPath = (PathSegment.NameSegment) colPath.getChild(); + names.add(colPath.getPath()); + } + if (!colMetadata.isMap() && !colMetadata.isDict()) { throw new DrillRuntimeException(String.format("Expected map or dict, but was %s", colMetadata.majorType())); }