This is an automated email from the ASF dual-hosted git repository.

bohdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 65fb7dd  DRILL-7509: Incorrect TupleSchema is created for DICT column 
when querying Parquet files
65fb7dd is described below

commit 65fb7ddc144ecae5330c9325af63010748f74cdf
Author: Bohdan Kazydub <bohdan.kazy...@gmail.com>
AuthorDate: Mon Jan 13 18:58:29 2020 +0200

    DRILL-7509: Incorrect TupleSchema is created for DICT column when querying 
Parquet files
---
 .../exec/store/parquet/FilterEvaluatorUtils.java   |  20 ++++-
 .../exec/store/parquet/ParquetReaderUtility.java   |  57 +++++++++----
 .../store/parquet/ParquetTableMetadataUtils.java   |  63 ++++++++++----
 .../parquet/metadata/FileMetadataCollector.java    |  95 ++++++++++++++++-----
 .../exec/store/parquet/metadata/Metadata.java      |   4 +-
 .../exec/store/parquet/metadata/MetadataBase.java  |  10 +++
 .../store/parquet/metadata/MetadataVersion.java    |  47 +++++++++-
 .../exec/store/parquet/metadata/Metadata_V4.java   |  19 ++++-
 .../exec/store/parquet/TestParquetComplex.java     |  11 +++
 .../store/parquet/TestParquetMetadataVersion.java  |  37 ++++++++
 .../store/parquet/complex/repeated_struct.parquet  | Bin 0 -> 608 bytes
 .../record/metadata/AbstractColumnMetadata.java    |   2 +-
 .../record/metadata/AbstractMapColumnMetadata.java |  10 +--
 .../exec/record/metadata/DictColumnMetadata.java   |  21 ++++-
 .../drill/exec/record/metadata/MetadataUtils.java  |   8 +-
 .../drill/metastore/util/SchemaPathUtils.java      |  95 +++++++++++++++++++--
 16 files changed, 421 insertions(+), 78 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
index ffde9c3..80acade 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
@@ -113,7 +113,16 @@ public class FilterEvaluatorUtils {
       StatisticsProvider<T> rangeExprEvaluator = new 
StatisticsProvider(columnsStatistics, rowCount);
       rowsMatch = parquetPredicate.matches(rangeExprEvaluator);
     }
-    return rowsMatch == RowsMatch.ALL && isRepeated(schemaPathsInExpr, 
fileMetadata) ? RowsMatch.SOME : rowsMatch;
+
+    if (rowsMatch == RowsMatch.ALL && isMetaNotApplicable(schemaPathsInExpr, 
fileMetadata)) {
+      rowsMatch = RowsMatch.SOME;
+    }
+
+    return rowsMatch;
+  }
+
+  private static boolean isMetaNotApplicable(Set<SchemaPath> 
schemaPathsInExpr, TupleMetadata fileMetadata) {
+    return isRepeated(schemaPathsInExpr, fileMetadata) || 
isDictOrRepeatedMapChild(schemaPathsInExpr, fileMetadata);
   }
 
   private static boolean isRepeated(Set<SchemaPath> fields, TupleMetadata 
fileMetadata) {
@@ -127,6 +136,15 @@ public class FilterEvaluatorUtils {
     return false;
   }
 
+  private static boolean isDictOrRepeatedMapChild(Set<SchemaPath> fields, 
TupleMetadata fileMetadata) {
+    for (SchemaPath field : fields) {
+      if (SchemaPathUtils.isFieldNestedInDictOrRepeatedMap(field, 
fileMetadata)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /**
    * Search through a LogicalExpression, finding all internal schema path 
references and returning them in a set.
    */
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 26021e3..d4267dd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -238,15 +238,14 @@ public class ParquetReaderUtility {
   }
 
   public static void correctDatesInMetadataCache(ParquetTableMetadataBase 
parquetTableMetadata) {
+    MetadataVersion metadataVersion = new 
MetadataVersion(parquetTableMetadata.getMetadataVersion());
     DateCorruptionStatus cacheFileCanContainsCorruptDates =
-        new 
MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new 
MetadataVersion(3, 0)) >= 0 ?
+        metadataVersion.isAtLeast(3, 0) ?
         DateCorruptionStatus.META_SHOWS_NO_CORRUPTION : 
DateCorruptionStatus.META_UNCLEAR_TEST_VALUES;
     if (cacheFileCanContainsCorruptDates == 
DateCorruptionStatus.META_UNCLEAR_TEST_VALUES) {
-      boolean mdVersion_1_0 = new MetadataVersion(1, 0).equals(new 
MetadataVersion(parquetTableMetadata.getMetadataVersion()));
-      boolean mdVersion_2_0 = new MetadataVersion(2, 0).equals(new 
MetadataVersion(parquetTableMetadata.getMetadataVersion()));
       // Looking for the DATE data type of column names in the metadata cache 
file ("metadata_version" : "v2")
       String[] names = new String[0];
-      if (mdVersion_2_0) {
+      if (metadataVersion.isEqualTo(2, 0)) {
         for (ColumnTypeMetadata_v2 columnTypeMetadata :
             ((ParquetTableMetadata_v2) 
parquetTableMetadata).columnTypeInfo.values()) {
           if (OriginalType.DATE.equals(columnTypeMetadata.originalType)) {
@@ -261,7 +260,7 @@ public class ParquetReaderUtility {
         Long rowCount = rowGroupMetadata.getRowCount();
         for (ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
           // Setting Min/Max values for ParquetTableMetadata_v1
-          if (mdVersion_1_0) {
+          if (metadataVersion.isEqualTo(1, 0)) {
             OriginalType originalType = columnMetadata.getOriginalType();
             if (OriginalType.DATE.equals(originalType) && 
columnMetadata.hasSingleValue(rowCount) &&
                 (Integer) columnMetadata.getMaxValue() > 
ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
@@ -271,7 +270,7 @@ public class ParquetReaderUtility {
             }
           }
           // Setting Max values for ParquetTableMetadata_v2
-          else if (mdVersion_2_0 &&
+          else if (metadataVersion.isEqualTo(2, 0) &&
                    columnMetadata.getName() != null &&
                    Arrays.equals(columnMetadata.getName(), names) &&
                    columnMetadata.hasSingleValue(rowCount) &&
@@ -299,7 +298,8 @@ public class ParquetReaderUtility {
     boolean allowBinaryMetadata = 
allowBinaryMetadata(parquetTableMetadata.getDrillVersion(), readerConfig);
 
     // Setting Min / Max values for ParquetTableMetadata_v1
-    if (new MetadataVersion(1, 0).equals(new 
MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
+    MetadataVersion metadataVersion = new 
MetadataVersion(parquetTableMetadata.getMetadataVersion());
+    if (metadataVersion.isEqualTo(1, 0)) {
       for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
         for (RowGroupMetadata rowGroupMetadata : file.getRowGroups()) {
           Long rowCount = rowGroupMetadata.getRowCount();
@@ -320,7 +320,7 @@ public class ParquetReaderUtility {
     int maxNumColumns = 0;
 
     // Setting Min / Max values for V2, V3 and V4 versions; for versions V3_3 
and above need to do decoding
-    boolean needDecoding = new 
MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new 
MetadataVersion(3, 3)) >= 0;
+    boolean needDecoding = metadataVersion.isAtLeast(3, 3);
     for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
       if ( timer != null ) { // for debugging only
         maxRowGroups = Math.max(maxRowGroups, file.getRowGroups().size());
@@ -718,15 +718,17 @@ public class ParquetReaderUtility {
 
   /**
    * Converts list of {@link OriginalType}s to list of {@link 
org.apache.drill.common.types.TypeProtos.MajorType}s.
-   * <b>NOTE</b>: current implementation cares about {@link OriginalType#MAP} 
only
-   * converting it to {@link 
org.apache.drill.common.types.TypeProtos.MinorType#DICT}.
+   * <b>NOTE</b>: current implementation cares about {@link OriginalType#MAP} 
and {@link OriginalType#LIST} only
+   * converting it to {@link 
org.apache.drill.common.types.TypeProtos.MinorType#DICT}
+   * and {@link org.apache.drill.common.types.TypeProtos.MinorType#LIST} 
respectively.
    * Other original types are converted to {@code null}, because there is no 
certain correspondence
-   * (and, actually, a need because these types are used to differentiate 
between Drill's MAP and DICT types
+   * (and, actually, a need because these types are used to differentiate 
between Drill's MAP and DICT (and arrays of thereof) types
    * when constructing {@link 
org.apache.drill.exec.record.metadata.TupleSchema}) between these two.
    *
    * @param originalTypes list of Parquet's types
    * @return list containing either {@code null} or type with minor
-   *         type {@link 
org.apache.drill.common.types.TypeProtos.MinorType#DICT} values
+   *         type {@link 
org.apache.drill.common.types.TypeProtos.MinorType#DICT} or
+   *         {@link org.apache.drill.common.types.TypeProtos.MinorType#LIST} 
values
    */
   public static List<TypeProtos.MajorType> getComplexTypes(List<OriginalType> 
originalTypes) {
     List<TypeProtos.MajorType> result = new ArrayList<>();
@@ -735,11 +737,9 @@ public class ParquetReaderUtility {
     }
     for (OriginalType type : originalTypes) {
       if (type == OriginalType.MAP) {
-        TypeProtos.MajorType drillType = TypeProtos.MajorType.newBuilder()
-            .setMinorType(TypeProtos.MinorType.DICT)
-            .setMode(TypeProtos.DataMode.OPTIONAL)
-            .build();
-        result.add(drillType);
+        result.add(Types.required(TypeProtos.MinorType.DICT));
+      } else if (type == OriginalType.LIST) {
+        result.add(Types.required(TypeProtos.MinorType.LIST));
       } else {
         result.add(null);
       }
@@ -807,4 +807,27 @@ public class ParquetReaderUtility {
     }
     return false;
   }
+
+  /**
+   * Converts Parquet's {@link Type.Repetition} to Drill's {@link 
TypeProtos.DataMode}.
+   * @param repetition repetition to be converted
+   * @return data mode corresponding to Parquet's repetition
+   */
+  public static TypeProtos.DataMode getDataMode(Type.Repetition repetition) {
+    TypeProtos.DataMode mode;
+    switch (repetition) {
+      case REPEATED:
+        mode = TypeProtos.DataMode.REPEATED;
+        break;
+      case OPTIONAL:
+        mode = TypeProtos.DataMode.OPTIONAL;
+        break;
+      case REQUIRED:
+        mode = TypeProtos.DataMode.REQUIRED;
+        break;
+      default:
+        throw new IllegalArgumentException(String.format("Unknown Repetition: 
%s.", repetition));
+    }
+    return mode;
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
index 68254d3..0bad959 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
@@ -472,7 +472,7 @@ public class ParquetTableMetadataUtils {
   public static Map<SchemaPath, TypeProtos.MajorType> getRowGroupFields(
       MetadataBase.ParquetTableMetadataBase parquetTableMetadata, 
MetadataBase.RowGroupMetadata rowGroup) {
     Map<SchemaPath, TypeProtos.MajorType> columns = new LinkedHashMap<>();
-    if (new 
MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new 
MetadataVersion(4, 0)) > 0
+    if (new 
MetadataVersion(parquetTableMetadata.getMetadataVersion()).isHigherThan(4, 0)
         && !((Metadata_V4.ParquetTableMetadata_v4) 
parquetTableMetadata).isAllColumnsInteresting()) {
       // adds non-interesting fields from table metadata
       for (MetadataBase.ColumnTypeMetadata columnTypeMetadata : 
parquetTableMetadata.getColumnTypeInfoList()) {
@@ -507,27 +507,61 @@ public class ParquetTableMetadataUtils {
       MetadataBase.ParquetTableMetadataBase parquetTableMetadata) {
     int precision = 0;
     int scale = 0;
-    int definitionLevel = 1;
-    int repetitionLevel = 0;
     MetadataVersion metadataVersion = new 
MetadataVersion(parquetTableMetadata.getMetadataVersion());
     // only ColumnTypeMetadata_v3 and ColumnTypeMetadata_v4 store information 
about scale, precision, repetition level and definition level
-    if (parquetTableMetadata.hasColumnMetadata() && 
(metadataVersion.compareTo(new MetadataVersion(3, 0)) >= 0)) {
+    if (metadataVersion.isAtLeast(3, 0)) {
       scale = parquetTableMetadata.getScale(name);
       precision = parquetTableMetadata.getPrecision(name);
-      repetitionLevel = parquetTableMetadata.getRepetitionLevel(name);
-      definitionLevel = parquetTableMetadata.getDefinitionLevel(name);
     }
+
+    TypeProtos.DataMode mode = getDataMode(parquetTableMetadata, 
metadataVersion, name);
+    return 
TypeProtos.MajorType.newBuilder(ParquetReaderUtility.getType(primitiveType, 
originalType, precision, scale))
+        .setMode(mode)
+        .build();
+  }
+
+  /**
+   * Obtain data mode from table metadata for a column. Algorithm for 
retrieving data mode depends on metadata version:
+   * <ul>
+   *   <li>starting from version {@code 4.2}, Parquet's {@link 
org.apache.parquet.schema.Type.Repetition}
+   *   is stored in table metadata itself;</li>
+   *   <li>starting from {@code 3.0} to {@code 4.2} (exclusively) the data 
mode is
+   *   computed based on max {@code definition} and {@code repetition} levels
+   *   ({@link 
MetadataBase.ParquetTableMetadataBase#getDefinitionLevel(String[])} and
+   *   {@link 
MetadataBase.ParquetTableMetadataBase#getRepetitionLevel(String[])} 
respectively)
+   *   obtained from Parquet's schema;
+   *
+   *   <p><strong>Note:</strong> this computation may lead to erroneous 
results,
+   *   when there are few nesting levels.</p>
+   *   </li>
+   *   <li>prior to {@code 3.0} {@code DataMode.OPTIONAL} is returned.</li>
+   * </ul>
+   * @param tableMetadata Parquet table metadata
+   * @param metadataVersion version of Parquet table metadata
+   * @param name (leaf) column to obtain data mode for
+   * @return data mode of the specified column
+   */
+  private static TypeProtos.DataMode 
getDataMode(MetadataBase.ParquetTableMetadataBase tableMetadata,
+      MetadataVersion metadataVersion, String[] name) {
     TypeProtos.DataMode mode;
-    if (repetitionLevel >= 1) {
-      mode = TypeProtos.DataMode.REPEATED;
-    } else if (repetitionLevel == 0 && definitionLevel == 0) {
-      mode = TypeProtos.DataMode.REQUIRED;
+    if (metadataVersion.isAtLeast(4, 2)) {
+      mode = 
ParquetReaderUtility.getDataMode(tableMetadata.getRepetition(name));
+    } else if (metadataVersion.isAtLeast(3, 0)) {
+      int definitionLevel = tableMetadata.getDefinitionLevel(name);
+      int repetitionLevel = tableMetadata.getRepetitionLevel(name);
+
+      if (repetitionLevel >= 1) {
+        mode = TypeProtos.DataMode.REPEATED;
+      } else if (repetitionLevel == 0 && definitionLevel == 0) {
+        mode = TypeProtos.DataMode.REQUIRED;
+      } else {
+        mode = TypeProtos.DataMode.OPTIONAL;
+      }
     } else {
       mode = TypeProtos.DataMode.OPTIONAL;
     }
-    return 
TypeProtos.MajorType.newBuilder(ParquetReaderUtility.getType(primitiveType, 
originalType, precision, scale))
-        .setMode(mode)
-        .build();
+
+    return mode;
   }
 
   /**
@@ -547,8 +581,7 @@ public class ParquetTableMetadataUtils {
     Map<SchemaPath, TypeProtos.MajorType> columns = new LinkedHashMap<>();
 
     MetadataVersion metadataVersion = new 
MetadataVersion(parquetTableMetadata.getMetadataVersion());
-    boolean hasParentTypes = parquetTableMetadata.hasColumnMetadata()
-        && metadataVersion.compareTo(new MetadataVersion(4, 1)) >= 0;
+    boolean hasParentTypes = metadataVersion.isAtLeast(4, 1);
 
     if (!hasParentTypes) {
       return Collections.emptyMap();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
index 9b74c6d..c79996b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
@@ -185,6 +185,7 @@ public class FileMetadataCollector {
         .totalNullCount(0)
         .interesting(false)
         .parentTypes(colTypeInfo.parentTypes)
+        .repetition(colTypeInfo.repetition)
         .build();
     Metadata_V4.ColumnTypeMetadata_v4.Key columnTypeMetadataKey = new 
Metadata_V4.ColumnTypeMetadata_v4.Key(columnTypeMetadata.name);
 
@@ -254,41 +255,91 @@ public class FileMetadataCollector {
     int scale;
     int repetitionLevel;
     int definitionLevel;
-
-    ColTypeInfo(OriginalType originalType, List<OriginalType> parentTypes,
-                int precision, int scale, int repetitionLevel, int 
definitionLevel) {
-      this.originalType = originalType;
-      this.parentTypes = parentTypes;
-      this.precision = precision;
-      this.scale = scale;
-      this.repetitionLevel = repetitionLevel;
-      this.definitionLevel = definitionLevel;
-    }
+    Type.Repetition repetition;
 
     static ColTypeInfo of(MessageType schema, Type type, String[] path, int 
depth, List<OriginalType> parentTypes) {
       if (type.isPrimitive()) {
-        PrimitiveType primitiveType = (PrimitiveType) type;
-        int precision = 0;
-        int scale = 0;
-        if (primitiveType.getDecimalMetadata() != null) {
-          precision = primitiveType.getDecimalMetadata().getPrecision();
-          scale = primitiveType.getDecimalMetadata().getScale();
-        }
-
-        int repetitionLevel = schema.getMaxRepetitionLevel(path);
-        int definitionLevel = schema.getMaxDefinitionLevel(path);
-
-        return new ColTypeInfo(type.getOriginalType(), parentTypes, precision, 
scale, repetitionLevel, definitionLevel);
+        return createColTypeInfo(type.asPrimitiveType(), schema, path, 
parentTypes);
       }
+
       Type t = ((GroupType) type).getType(path[depth]);
       if (!t.isPrimitive()) {
         OriginalType originalType = t.getOriginalType();
         if (originalType == OriginalType.MAP && 
!ParquetReaderUtility.isLogicalMapType(t.asGroupType())) {
           originalType = null;
+        } else if (originalType == OriginalType.LIST && 
!ParquetReaderUtility.isLogicalListType(t.asGroupType())) {
+          originalType = null;
         }
         parentTypes.add(originalType);
       }
       return of(schema, t, path, depth + 1, parentTypes);
     }
+
+    private static ColTypeInfo createColTypeInfo(PrimitiveType type, 
MessageType schema,
+                                                 String[] path, 
List<OriginalType> parentTypes) {
+      int precision = 0;
+      int scale = 0;
+      if (type.getDecimalMetadata() != null) {
+        precision = type.getDecimalMetadata().getPrecision();
+        scale = type.getDecimalMetadata().getScale();
+      }
+
+      int repetitionLevel = schema.getMaxRepetitionLevel(path);
+      int definitionLevel = schema.getMaxDefinitionLevel(path);
+
+      Type.Repetition repetition;
+      // Check if the primitive has LIST as parent, if it does - this is an 
array of primitives.
+      // (See ParquetReaderUtility#isLogicalListType(GroupType) for the 
REPEATED field structure.)
+      int probableListIndex = parentTypes.size() - 2;
+      if (probableListIndex >= 0 && parentTypes.get(probableListIndex) == 
OriginalType.LIST) {
+        repetition = Type.Repetition.REPEATED;
+      } else {
+        repetition = type.getRepetition();
+      }
+
+      return new ColTypeInfo()
+          .setOriginalType(type.getOriginalType())
+          .setParentTypes(parentTypes)
+          .setPrecision(precision)
+          .setScale(scale)
+          .setRepetitionLevel(repetitionLevel)
+          .setDefinitionLevel(definitionLevel)
+          .setRepetition(repetition);
+    }
+
+    private ColTypeInfo setOriginalType(OriginalType originalType) {
+      this.originalType = originalType;
+      return this;
+    }
+
+    private ColTypeInfo setParentTypes(List<OriginalType> parentTypes) {
+      this.parentTypes = parentTypes;
+      return this;
+    }
+
+    private ColTypeInfo setPrecision(int precision) {
+      this.precision = precision;
+      return this;
+    }
+
+    private ColTypeInfo setScale(int scale) {
+      this.scale = scale;
+      return this;
+    }
+
+    private ColTypeInfo setRepetitionLevel(int repetitionLevel) {
+      this.repetitionLevel = repetitionLevel;
+      return this;
+    }
+
+    private ColTypeInfo setDefinitionLevel(int definitionLevel) {
+      this.definitionLevel = definitionLevel;
+      return this;
+    }
+
+    private ColTypeInfo setRepetition(Type.Repetition repetition) {
+      this.repetition = repetition;
+      return this;
+    }
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index 03d1fd0..92a1adb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -592,7 +592,7 @@ public class Metadata {
       } else {
         if (isFileMetadata) {
           parquetTableMetadata.assignFiles((mapper.readValue(is, 
FileMetadata.class)).getFiles());
-          if (new 
MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new 
MetadataVersion(4, 0)) >= 0) {
+          if (new 
MetadataVersion(parquetTableMetadata.getMetadataVersion()).isAtLeast(4, 0)) {
             ((ParquetTableMetadata_v4) 
parquetTableMetadata).updateRelativePaths(metadataParentDirPath);
           }
 
@@ -606,7 +606,7 @@ public class Metadata {
           parquetTableMetadata = new ParquetTableMetadata_v4(metadataSummary);
         } else {
           parquetTableMetadata = mapper.readValue(is, 
ParquetTableMetadataBase.class);
-          if (new 
MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new 
MetadataVersion(3, 0)) >= 0) {
+          if (new 
MetadataVersion(parquetTableMetadata.getMetadataVersion()).isAtLeast(3, 0)) {
             ((Metadata_V3.ParquetTableMetadata_v3) 
parquetTableMetadata).updateRelativePaths(metadataParentDirPath);
           }
           if (!alreadyCheckedModification && 
tableModified((parquetTableMetadata.getDirectories()), path, metadataParentDir, 
metaContext, fs)) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
index 126aa6d..14b2703 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
 
 import java.util.List;
 import java.util.Map;
@@ -34,6 +35,8 @@ import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Const
 import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V3_2;
 import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V3_3;
 import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V4;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V4_1;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V4_2;
 
 public class MetadataBase {
 
@@ -56,6 +59,8 @@ public class MetadataBase {
       @JsonSubTypes.Type(value = Metadata_V3.ParquetTableMetadata_v3.class, 
name = V3_2),
       @JsonSubTypes.Type(value = Metadata_V3.ParquetTableMetadata_v3.class, 
name = V3_3),
       @JsonSubTypes.Type(value = Metadata_V4.ParquetTableMetadata_v4.class, 
name = V4),
+      @JsonSubTypes.Type(value = Metadata_V4.ParquetTableMetadata_v4.class, 
name = V4_1),
+      @JsonSubTypes.Type(value = Metadata_V4.ParquetTableMetadata_v4.class, 
name = V4_2),
 
   })
   public static abstract class ParquetTableMetadataBase {
@@ -90,6 +95,11 @@ public class MetadataBase {
     @JsonIgnore public abstract String getMetadataVersion();
 
     @JsonIgnore  public abstract List<? extends ColumnTypeMetadata> 
getColumnTypeInfoList();
+
+    @JsonIgnore
+    public Type.Repetition getRepetition(String[] columnName) {
+      return null;
+    }
   }
 
   public static abstract class ParquetFileMetadata {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataVersion.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataVersion.java
index 46e4c57..1ffe59d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataVersion.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataVersion.java
@@ -103,6 +103,45 @@ public class MetadataVersion implements 
Comparable<MetadataVersion> {
         .result();
   }
 
+  /**
+   * Check if this version is at least (equals or higher) the one
+   * identified by {@code major} and {@code minor} versions integer literals.
+   *
+   * @param major major version
+   * @param minor minor version
+   * @return {@literal true} if the version is equal to or higher than
+   *         the one it is being checked against
+   */
+  public boolean isAtLeast(int major, int minor) {
+    return this.major > major || (this.major == major && this.minor >= minor);
+  }
+
+  /**
+   * Check if the version is the same as the one identified by
+   * {@code major} and {@code minor} versions integer literals.
+   *
+   * @param major major version
+   * @param minor minor version
+   * @return {@literal true} if the version is equal to the one
+   *         it is being checked against
+   */
+  public boolean isEqualTo(int major, int minor) {
+    return this.major == major && this.minor == minor;
+  }
+
+  /**
+   * Check if this version comes after the one identified by {@code major}
+   * and {@code minor} versions integer literals. That is, this one was 
introduced later.
+   *
+   * @param major major version
+   * @param minor minor version
+   * @return {@literal true} if the version is defined later than
+   *         the one it is being checked against
+   */
+  public boolean isHigherThan(int major, int minor) {
+    return this.major > major || (this.major == major && this.minor > minor);
+  }
+
 /**
  * Supported metadata versions.
  * <p>
@@ -157,6 +196,11 @@ public class MetadataVersion implements 
Comparable<MetadataVersion> {
     public static final String V4_1 = "4.1";
 
     /**
+     * Version 4.2: Added {@link org.apache.parquet.schema.Type.Repetition} to 
{@link Metadata_V4.ColumnTypeMetadata_v4}.
+     */
+    public static final String V4_2 = "4.2";
+
+    /**
      * All historical versions of the Drill metadata cache files. In case of 
introducing a new parquet metadata version
      * please follow the {@link MetadataVersion#FORMAT}.
      */
@@ -168,7 +212,8 @@ public class MetadataVersion implements 
Comparable<MetadataVersion> {
         new MetadataVersion(V3_2),
         new MetadataVersion(V3_3),
         new MetadataVersion(V4),
-        new MetadataVersion(V4_1)
+        new MetadataVersion(V4_1),
+        new MetadataVersion(V4_2)
     );
 
     /**
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java
index d17cd31..12d8f30 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.util.DrillVersionInfo;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -38,7 +39,7 @@ import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnTy
 import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
 import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
 import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
-import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V4;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V4_2;
 
 public class Metadata_V4 {
 
@@ -158,6 +159,11 @@ public class Metadata_V4 {
       return metadataSummary.drillVersion;
     }
 
+    @Override
+    public Type.Repetition getRepetition(String[] columnName) {
+      return getColumnTypeInfo(columnName).repetition;
+    }
+
     public MetadataSummary getSummary() {
       return metadataSummary;
     }
@@ -309,6 +315,8 @@ public class Metadata_V4 {
     public long totalNullCount = 0;
     @JsonProperty
     public boolean isInteresting = false;
+    @JsonProperty
+    public Type.Repetition repetition;
 
     // Key to find by name only
     @JsonIgnore
@@ -329,6 +337,7 @@ public class Metadata_V4 {
       this.totalNullCount = builder.totalNullCount;
       this.isInteresting = builder.isInteresting;
       this.parentTypes = Collections.unmodifiableList(builder.parentTypes);
+      this.repetition = builder.repetition;
     }
 
     @JsonIgnore
@@ -413,6 +422,7 @@ public class Metadata_V4 {
       private int definitionLevel;
       private long totalNullCount;
       private boolean isInteresting;
+      private Type.Repetition repetition;
 
       public Builder name(String[] name) {
         this.name = name;
@@ -464,6 +474,11 @@ public class Metadata_V4 {
         return this;
       }
 
+      public Builder repetition(Type.Repetition repetition) {
+        this.repetition = repetition;
+        return this;
+      }
+
       public ColumnTypeMetadata_v4 build() {
         return new ColumnTypeMetadata_v4(this);
       }
@@ -483,7 +498,7 @@ public class Metadata_V4 {
     }
   }
 
-  @JsonTypeName(V4)
+  @JsonTypeName(V4_2)
   public static class MetadataSummary {
 
     @JsonProperty(value = "metadata_version")
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
index 6e02004..b496f33 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
@@ -875,4 +875,15 @@ public class TestParquetComplex extends BaseTestQuery {
         .baselineValues(3L, 0L, 3L, 3L)
         .go();
   }
+
+  @Test // DRILL-7509
+  public void selectRepeatedMapWithFilter() throws Exception {
+    String query = "select id, struct_array[1].b as b from 
cp.`store/parquet/complex/repeated_struct.parquet` where struct_array[1].b is 
null";
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("id", "b")
+        .baselineValues(2, null)
+        .go();
+  }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataVersion.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataVersion.java
index f880126..4ff2802 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataVersion.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataVersion.java
@@ -26,6 +26,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 @Category({ParquetTest.class, UnlikelyTest.class})
@@ -123,4 +124,40 @@ public class TestParquetMetadataVersion extends BaseTest {
       throw e;
     }
   }
+
+  @Test
+  public void testAtLeast() {
+    MetadataVersion version = new MetadataVersion("v4.2");
+    assertTrue(version.isAtLeast(4, 0));
+    assertTrue(version.isAtLeast(4, 1));
+    assertTrue(version.isAtLeast(4, 2));
+    assertFalse(version.isAtLeast(4, 3));
+    assertFalse(version.isAtLeast(5, 1));
+    assertTrue(version.isAtLeast(3, 0));
+    assertTrue(version.isAtLeast(1, 0));
+  }
+
+  @Test
+  public void testAfter() {
+    MetadataVersion version = new MetadataVersion(4, 1);
+    assertFalse(version.isHigherThan(4,1));
+    assertFalse(version.isHigherThan(4,3));
+    assertFalse(version.isHigherThan(5,0));
+    assertTrue(version.isHigherThan(4, 0));
+    assertTrue(version.isHigherThan(3, 0));
+    assertTrue(version.isHigherThan(2, 1));
+    assertTrue(version.isHigherThan(1, 3));
+    assertTrue(version.isHigherThan(1, 0));
+  }
+
+  @Test
+  public void testIsEqual() {
+    MetadataVersion version = new MetadataVersion(3, 2);
+    assertTrue(version.isEqualTo(3, 2));
+    assertFalse(version.isEqualTo(4, 2));
+    assertFalse(version.isEqualTo(2, 3));
+    assertFalse(version.isEqualTo(1, 0));
+    assertFalse(version.isEqualTo(3, 1));
+    assertFalse(version.isEqualTo(1, 2));
+  }
 }
diff --git 
a/exec/java-exec/src/test/resources/store/parquet/complex/repeated_struct.parquet
 
b/exec/java-exec/src/test/resources/store/parquet/complex/repeated_struct.parquet
new file mode 100644
index 0000000..c46dc90
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/store/parquet/complex/repeated_struct.parquet
 differ
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
index 99bcdb3..0c8a8bf 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
@@ -306,7 +306,7 @@ public abstract class AbstractColumnMetadata extends 
AbstractPropertied implemen
     builder.append(typeString());
 
     // Drill does not have nullability notion for complex types
-    if (!isNullable() && !isArray() && !isMap()) {
+    if (!isNullable() && !isArray() && !isMap() && !isDict()) {
       builder.append(" NOT NULL");
     }
 
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractMapColumnMetadata.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractMapColumnMetadata.java
index 6f79336..cecfdd9 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractMapColumnMetadata.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractMapColumnMetadata.java
@@ -47,21 +47,21 @@ public abstract class AbstractMapColumnMetadata extends 
AbstractColumnMetadata {
    * the children) of the materialized field provided.
    *
    * @param schema the schema to use
-   * @param mapSchema parent schema
+   * @param tupleSchema parent schema
    */
-  AbstractMapColumnMetadata(MaterializedField schema, TupleSchema mapSchema) {
+  AbstractMapColumnMetadata(MaterializedField schema, TupleSchema tupleSchema) 
{
     super(schema);
-    if (mapSchema == null) {
+    if (tupleSchema == null) {
       this.schema = new TupleSchema();
     } else {
-      this.schema = mapSchema;
+      this.schema = tupleSchema;
     }
     this.schema.bind(this);
   }
 
   public AbstractMapColumnMetadata(AbstractMapColumnMetadata from) {
     super(from);
-    schema = (TupleSchema) from.schema.copy();
+    schema = from.schema.copy();
   }
 
   public AbstractMapColumnMetadata(String name, MinorType type, DataMode mode, 
TupleSchema schema) {
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictColumnMetadata.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictColumnMetadata.java
index 50957e8..66ba63f 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictColumnMetadata.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictColumnMetadata.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.record.metadata;
 
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.complex.DictVector;
 
 public class DictColumnMetadata extends AbstractMapColumnMetadata {
 
@@ -31,6 +32,14 @@ public class DictColumnMetadata extends 
AbstractMapColumnMetadata {
     this(schema, null);
   }
 
+  public DictColumnMetadata(String name, TypeProtos.DataMode mode) {
+    this(name, mode, null);
+  }
+
+  public DictColumnMetadata(DictColumnMetadata from) {
+    super(from);
+  }
+
   /**
    * Build a dict column metadata by cloning the type information (but not
    * the children) of the materialized field provided.
@@ -42,12 +51,16 @@ public class DictColumnMetadata extends 
AbstractMapColumnMetadata {
     super(schema, tupleSchema);
   }
 
-  public DictColumnMetadata(DictColumnMetadata from) {
-    super(from);
+  DictColumnMetadata(String name, TypeProtos.DataMode mode, TupleSchema 
tupleSchema) {
+    super(name, TypeProtos.MinorType.DICT, mode, tupleSchema);
+  }
+
+  public ColumnMetadata keyColumnMetadata() {
+    return schema.metadata(DictVector.FIELD_KEY_NAME);
   }
 
-  public DictColumnMetadata(String name, TypeProtos.DataMode mode, TupleSchema 
mapSchema) {
-    super(name, TypeProtos.MinorType.DICT, mode, mapSchema);
+  public ColumnMetadata valueColumnMetadata() {
+    return schema.metadata(DictVector.FIELD_VALUE_NAME);
   }
 
   @Override
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
index 05936a3..03f304c 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
@@ -160,8 +160,8 @@ public class MetadataUtils {
     }
   }
 
-  public static DictColumnMetadata newDict(String name, TupleMetadata schema) {
-    return new DictColumnMetadata(name, DataMode.REQUIRED, (TupleSchema) 
schema);
+  public static DictColumnMetadata newDict(String name) {
+    return new DictColumnMetadata(name, DataMode.REQUIRED);
   }
 
   public static VariantColumnMetadata newVariant(MaterializedField field, 
VariantSchema schema) {
@@ -187,6 +187,10 @@ public class MetadataUtils {
     return new MapColumnMetadata(name, DataMode.REPEATED, (TupleSchema) 
schema);
   }
 
+  public static DictColumnMetadata newDictArray(String name) {
+    return new DictColumnMetadata(name, DataMode.REPEATED);
+  }
+
   public static PrimitiveColumnMetadata newScalar(String name, MinorType type,
       DataMode mode) {
     assert isScalar(type);
diff --git 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java
 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java
index 4643c31..a89d435 100644
--- 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java
+++ 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java
@@ -21,8 +21,10 @@ import 
org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.DictColumnMetadata;
 import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -49,8 +51,7 @@ public class SchemaPathUtils {
     ColumnMetadata colMetadata = schema.metadata(colPath.getPath());
     while (!colPath.isLastPath() && colMetadata != null) {
       if (colMetadata.isDict()) {
-        // get dict's value field metadata
-        colMetadata = 
colMetadata.tupleSchema().metadata(0).tupleSchema().metadata(1);
+        colMetadata = ((DictColumnMetadata) colMetadata).valueColumnMetadata();
         break;
       }
       if (!colMetadata.isMap()) {
@@ -64,6 +65,40 @@ public class SchemaPathUtils {
   }
 
   /**
+   * Checks if field identified by the schema path is child in either {@code 
DICT} or {@code REPEATED MAP}.
+   * For such fields, nested in {@code DICT} or {@code REPEATED MAP},
+   * filters can't be removed based on Parquet statistics.
+   *
+   * <p>The need for the check arises because statistics data is not obtained 
for such fields as their representation
+   * differs from the 'canonical' one. For example, field {@code `a`} in 
Parquet's {@code STRUCT ARRAY} is represented
+   * as {@code `struct_array`.`bag`.`array_element`.`a`} but once it is used 
in a filter, {@code ... WHERE struct_array[0].a = 1},
+   * it has different representation (with indexes stripped): {@code 
`struct_array`.`a`} which is not present in statistics.
+   * The same happens with DICT's {@code value}: for {@code SELECT ... WHERE 
dict_col['a'] = 0}, statistics exist for
+   * {@code `dict_col`.`key_value`.`value`} but the field in filter is 
translated to {@code `dict_col`.`a`} and hence it is
+   * considered not present in statistics. If the fields (such as ones shown 
in examples) are {@code OPTIONAL INT} then
+   * the field is considered not present in a table and is treated as {@code 
NULL}. To avoid this situation, the method is used.</p>
+   *
+   * @param schemaPath schema path used in filter
+   * @param schema schema containing all the fields in the file
+   * @return {@literal true} if field is nested inside {@code DICT} (is {@code 
`key`} or {@code `value`})
+   *         or inside {@code REPEATED MAP} field, {@literal false} otherwise.
+   */
+  public static boolean isFieldNestedInDictOrRepeatedMap(SchemaPath 
schemaPath, TupleMetadata schema) {
+    PathSegment.NameSegment colPath = 
schemaPath.getUnIndexed().getRootSegment();
+    ColumnMetadata colMetadata = schema.metadata(colPath.getPath());
+    while (!colPath.isLastPath() && colMetadata != null) {
+      if (colMetadata.isDict() || (colMetadata.isMap() && 
Types.isRepeated(colMetadata.majorType()))) {
+        return true;
+      } else if (!colMetadata.isMap()) {
+        break;
+      }
+      colPath = (PathSegment.NameSegment) colPath.getChild();
+      colMetadata = colMetadata.tupleSchema().metadata(colPath.getPath());
+    }
+    return false;
+  }
+
+  /**
    * Adds column with specified schema path and type into specified {@code 
TupleMetadata schema}.
    * For the case when specified {@link SchemaPath} has children, 
corresponding maps will be created
    * in the {@code TupleMetadata schema} and the last child of the map will 
have specified type.
@@ -73,23 +108,71 @@ public class SchemaPathUtils {
    * @param type       type of the column which should be added
    * @param types      list of column's parent types
    */
-  public static void addColumnMetadata(TupleMetadata schema, SchemaPath 
schemaPath, TypeProtos.MajorType type, Map<SchemaPath, TypeProtos.MajorType> 
types) {
+  public static void addColumnMetadata(TupleMetadata schema, SchemaPath 
schemaPath,
+        TypeProtos.MajorType type, Map<SchemaPath, TypeProtos.MajorType> 
types) {
     PathSegment.NameSegment colPath = 
schemaPath.getUnIndexed().getRootSegment();
     List<String> names = new ArrayList<>(types.size());
+    // Used in case of LIST; defined here to avoid many instantiations inside 
while-loop
+    List<String> nextNames = new ArrayList<>(names.size());
     ColumnMetadata colMetadata;
     while (!colPath.isLastPath()) {
       names.add(colPath.getPath());
       colMetadata = schema.metadata(colPath.getPath());
       TypeProtos.MajorType pathType = 
types.get(SchemaPath.getCompoundPath(names.toArray(new String[0])));
+
+      // The following types, DICT and LIST, contain a nested segment in 
Parquet representation
+      // (see ParquetReaderUtility#isLogicalListType(GroupType) and 
ParquetReaderUtility#isLogicalMapType(GroupType))
+      // which we should skip when creating corresponding TupleMetadata 
representation. Additionally,
+      // there is a need to track if the field is LIST to create appropriate 
column metadata based
+      // on the info: whether to create singular MAP/DICT or MAP/DICT array.
+      boolean isDict = pathType != null && pathType.getMinorType() == 
TypeProtos.MinorType.DICT;
+      boolean isList = pathType != null && pathType.getMinorType() == 
TypeProtos.MinorType.LIST;
+      String name = colPath.getPath();
+
+      if (isList) {
+        nextNames.clear();
+        nextNames.addAll(names);
+
+        // Parquet's LIST group (which represents an array) has
+        // an inner group (bagSegment) which we want to skip here
+        PathSegment.NameSegment bagSegment = 
colPath.getChild().getNameSegment();
+        PathSegment.NameSegment elementSegment = 
bagSegment.getChild().getNameSegment();
+        nextNames.add(bagSegment.getPath());
+        nextNames.add(elementSegment.getPath());
+
+        pathType = types.get(SchemaPath.getCompoundPath(nextNames.toArray(new 
String[0])));
+
+        if (pathType == null && colPath.getChild().getChild().isLastPath()) {
+          // The list is actually a repeated primitive:
+          // will be handled after the while statement
+          break;
+        }
+
+        colPath = elementSegment;
+
+        names.add(bagSegment.getPath());
+        names.add(elementSegment.getPath());
+
+        // Check whether LIST's element type is DICT
+        isDict = pathType != null && pathType.getMinorType() == 
TypeProtos.MinorType.DICT;
+      }
+
       if (colMetadata == null) {
-        if (pathType != null && pathType.getMinorType() == 
TypeProtos.MinorType.DICT) {
-          colMetadata = MetadataUtils.newDict(colPath.getPath(), null);
+        if (isDict) {
+          colMetadata = isList ? MetadataUtils.newDictArray(name) : 
MetadataUtils.newDict(name);
         } else {
-          colMetadata = MetadataUtils.newMap(colPath.getPath(), null);
+          colMetadata = isList ? MetadataUtils.newMapArray(name, null) : 
MetadataUtils.newMap(name, null);
         }
         schema.addColumn(colMetadata);
       }
 
+      if (isDict) {
+        // Parquet's MAP (which corresponds to DICT in Drill) has
+        // an inner group which we want to skip here
+        colPath = (PathSegment.NameSegment) colPath.getChild();
+        names.add(colPath.getPath());
+      }
+
       if (!colMetadata.isMap() && !colMetadata.isDict()) {
         throw new DrillRuntimeException(String.format("Expected map or dict, 
but was %s", colMetadata.majorType()));
       }

Reply via email to