DRILL-4139: Add missing Interval, VarBinary and Varchar with nulls partition 
pruning support.
Fix metadata serialization for fixed_len_byte_array types.
Fix partition pruning for decimal type.
Fix loss of scale value for DECIMAL in parquet partition pruning.
Fix partition pruning for primitive types with null values.
Update parquet table metadata version to v3_3.
DRILL-4139: Fix wrong parquet metadata cache version after resolving conflicts 
with DRILL-4264

closes #805


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5df49ab9
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5df49ab9
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5df49ab9

Branch: refs/heads/master
Commit: 5df49ab9c250114d93aa90507b029fad77f4e6bd
Parents: 1ea191f
Author: Volodymyr Vysotskyi <vvo...@gmail.com>
Authored: Fri Mar 24 16:02:07 2017 +0000
Committer: Paul Rogers <prog...@maprtech.com>
Committed: Sat Sep 30 16:01:22 2017 -0700

----------------------------------------------------------------------
 .../drill/exec/store/parquet/Metadata.java      | 101 +++++++--
 .../exec/store/parquet/MetadataVersion.java     |   9 +-
 .../exec/store/parquet/ParquetGroupScan.java    | 222 +++++++++++++++----
 .../store/parquet/ParquetReaderUtility.java     | 112 +++++++++-
 .../parquet/stat/ParquetMetaStatCollector.java  |  36 ++-
 .../store/parquet/TestParquetMetadataCache.java | 217 +++++++++++++++++-
 6 files changed, 634 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5df49ab9/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index 5ac10e6..eadbeb0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Iterator;
 
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
@@ -86,6 +87,7 @@ import static 
org.apache.drill.exec.store.parquet.MetadataVersion.Constants.V2;
 import static org.apache.drill.exec.store.parquet.MetadataVersion.Constants.V3;
 import static 
org.apache.drill.exec.store.parquet.MetadataVersion.Constants.V3_1;
 import static 
org.apache.drill.exec.store.parquet.MetadataVersion.Constants.V3_2;
+import static 
org.apache.drill.exec.store.parquet.MetadataVersion.Constants.V3_3;
 
 public class Metadata {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(Metadata.class);
@@ -700,7 +702,8 @@ public class Metadata {
       @JsonSubTypes.Type(value = ParquetTableMetadata_v2.class, name = V2),
       @JsonSubTypes.Type(value = ParquetTableMetadata_v3.class, name = V3),
       @JsonSubTypes.Type(value = ParquetTableMetadata_v3.class, name = V3_1),
-      @JsonSubTypes.Type(value = ParquetTableMetadata_v3.class, name = V3_2)
+      @JsonSubTypes.Type(value = ParquetTableMetadata_v3.class, name = V3_2),
+      @JsonSubTypes.Type(value = ParquetTableMetadata_v3.class, name = V3_3)
       })
   public static abstract class ParquetTableMetadataBase {
 
@@ -756,7 +759,7 @@ public class Metadata {
 
     public abstract Long getNulls();
 
-    public abstract boolean hasSingleValue();
+    public abstract boolean hasSingleValue(long rowCount);
 
     public abstract Object getMinValue();
 
@@ -1054,8 +1057,36 @@ public class Metadata {
       return nulls;
     }
 
-    @Override public boolean hasSingleValue() {
-      return (max != null && min != null && max.equals(min));
+    /**
+     * Checks that the column chunk has a single value.
+     * Returns {@code true} if {@code min} and {@code max} are the same but 
not null
+     * and nulls count is 0 or equal to the rows count.
+     * <p>
+     * Returns {@code true} if {@code min} and {@code max} are null and the 
number of null values
+     * in the column chunk is equal to the rows count.
+     * <p>
+     * Comparison of nulls and rows count is needed for the cases:
+     * <ul>
+     * <li>column with primitive type has single value and null values</li>
+     *
+     * <li>column <b>with primitive type</b> has only null values, min/max 
couldn't be null,
+     * but column has single value</li>
+     * </ul>
+     *
+     * @param rowCount rows count in column chunk
+     * @return true if column has single value
+     */
+    @Override
+    public boolean hasSingleValue(long rowCount) {
+      if (nulls != null) {
+        if (min != null) {
+          // Objects.deepEquals() is used here, since min and max may be byte 
arrays
+          return Objects.deepEquals(min, max) && (nulls == 0 || nulls == 
rowCount);
+        } else {
+          return nulls == rowCount && max == null;
+        }
+      }
+      return false;
     }
 
     @Override public Object getMinValue() {
@@ -1363,9 +1394,24 @@ public class Metadata {
       return nulls;
     }
 
+    /**
+     * Checks that the column chunk has a single value.
+     * Returns {@code true} if {@code mxValue} is not null
+     * and nulls count is 0 or if nulls count is equal to the rows count.
+     * <p>
+     * Comparison of nulls and rows count is needed for the cases:
+     * <ul>
+     * <li>column with primitive type has single value and null values</li>
+     *
+     * <li>column <b>with binary type</b> has only null values, so column has 
single value</li>
+     * </ul>
+     *
+     * @param rowCount rows count in column chunk
+     * @return true if column has single value
+     */
     @Override
-    public boolean hasSingleValue() {
-      return (mxValue != null);
+    public boolean hasSingleValue(long rowCount) {
+      return (mxValue != null && nulls == 0) || nulls == rowCount;
     }
 
     @Override public Object getMinValue() {
@@ -1426,7 +1472,7 @@ public class Metadata {
 
   }
 
-  @JsonTypeName(V3_2)
+  @JsonTypeName(V3_3)
   public static class ParquetTableMetadata_v3 extends ParquetTableMetadataBase 
{
     @JsonProperty(value = "metadata_version", access = 
JsonProperty.Access.WRITE_ONLY) private String metadataVersion;
     /*
@@ -1753,9 +1799,36 @@ public class Metadata {
       return nulls;
     }
 
+    /**
+     * Checks that the column chunk has a single value.
+     * Returns {@code true} if {@code minValue} and {@code maxValue} are the 
same but not null
+     * and nulls count is 0 or equal to the rows count.
+     * <p>
+     * Returns {@code true} if {@code minValue} and {@code maxValue} are null 
and the number of null values
+     * in the column chunk is equal to the rows count.
+     * <p>
+     * Comparison of nulls and rows count is needed for the cases:
+     * <ul>
+     * <li>column with primitive type has single value and null values</li>
+     *
+     * <li>column <b>with primitive type</b> has only null values, min/max 
couldn't be null,
+     * but column has single value</li>
+     * </ul>
+     *
+     * @param rowCount rows count in column chunk
+     * @return true if column has single value
+     */
     @Override
-    public boolean hasSingleValue() {
-      return (minValue !=null && maxValue != null && 
minValue.equals(maxValue));
+    public boolean hasSingleValue(long rowCount) {
+      if (nulls != null) {
+        if (minValue != null) {
+          // Objects.deepEquals() is used here, since min and max may be byte 
arrays
+          return Objects.deepEquals(minValue, maxValue) && (nulls == 0 || 
nulls == rowCount);
+        } else {
+          return nulls == rowCount && maxValue == null;
+        }
+      }
+      return false;
     }
 
     @Override public Object getMinValue() {
@@ -1795,8 +1868,9 @@ public class Metadata {
         jgen.writeEndArray();
         if (value.minValue != null) {
           Object val;
-          if (value.primitiveType == PrimitiveTypeName.BINARY && 
value.minValue != null) {
-            val = new String(((Binary) value.minValue).getBytes());
+          if (value.primitiveType == PrimitiveTypeName.BINARY
+              || value.primitiveType == 
PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+            val = ((Binary) value.minValue).getBytes();
           } else {
             val = value.minValue;
           }
@@ -1804,8 +1878,9 @@ public class Metadata {
         }
         if (value.maxValue != null) {
           Object val;
-          if (value.primitiveType == PrimitiveTypeName.BINARY && 
value.maxValue != null) {
-            val = new String(((Binary) value.maxValue).getBytes());
+          if (value.primitiveType == PrimitiveTypeName.BINARY
+              || value.primitiveType == 
PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+            val = ((Binary) value.maxValue).getBytes();
           } else {
             val = value.maxValue;
           }

http://git-wip-us.apache.org/repos/asf/drill/blob/5df49ab9/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/MetadataVersion.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/MetadataVersion.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/MetadataVersion.java
index 5ceadcd..9fd64ac 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/MetadataVersion.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/MetadataVersion.java
@@ -141,6 +141,12 @@ public class MetadataVersion implements 
Comparable<MetadataVersion> {
     public static final String V3_2 = "3.2";
 
     /**
+     * Version 3.3: Changed serialization of BINARY and FIXED_LEN_BYTE_ARRAY 
fields.<br>
+     * See DRILL-4139
+     */
+    public static final String V3_3 = "3.3";
+
+    /**
      * All historical versions of the Drill metadata cache files. In case of 
introducing a new parquet metadata version
      * please follow the {@link MetadataVersion#FORMAT}.
      */
@@ -149,7 +155,8 @@ public class MetadataVersion implements 
Comparable<MetadataVersion> {
         new MetadataVersion(V2),
         new MetadataVersion(V3),
         new MetadataVersion(V3_1),
-        new MetadataVersion(V3_2)
+        new MetadataVersion(V3_2),
+        new MetadataVersion(V3_3)
     );
 
     /**

http://git-wip-us.apache.org/repos/asf/drill/blob/5df49ab9/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index fd38127..4e38ce9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -36,6 +36,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
@@ -75,6 +76,7 @@ import org.apache.drill.exec.store.schedule.AssignmentCreator;
 import org.apache.drill.exec.store.schedule.CompleteWork;
 import org.apache.drill.exec.store.schedule.EndpointByteMap;
 import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.exec.util.DecimalUtility;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.drill.exec.vector.NullableBigIntVector;
@@ -83,6 +85,7 @@ import org.apache.drill.exec.vector.NullableDecimal18Vector;
 import org.apache.drill.exec.vector.NullableFloat4Vector;
 import org.apache.drill.exec.vector.NullableFloat8Vector;
 import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableIntervalVector;
 import org.apache.drill.exec.vector.NullableSmallIntVector;
 import org.apache.drill.exec.vector.NullableTimeStampVector;
 import org.apache.drill.exec.vector.NullableTimeVector;
@@ -360,11 +363,20 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
    * potential partition column now no longer qualifies, so it needs to be 
removed from the list.
    * @return whether column is a potential partition column
    */
-  private boolean checkForPartitionColumn(ColumnMetadata columnMetadata, 
boolean first) {
+  private boolean checkForPartitionColumn(ColumnMetadata columnMetadata, 
boolean first, long rowCount) {
     SchemaPath schemaPath = 
SchemaPath.getCompoundPath(columnMetadata.getName());
     final PrimitiveTypeName primitiveType;
     final OriginalType originalType;
+    int precision = 0;
+    int scale = 0;
     if (this.parquetTableMetadata.hasColumnMetadata()) {
+      // only ColumnTypeMetadata_v3 stores information about scale and 
precision
+      if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v3) {
+        Metadata.ColumnTypeMetadata_v3 columnTypeInfo = 
((Metadata.ParquetTableMetadata_v3) parquetTableMetadata)
+                                                                          
.getColumnTypeInfo(columnMetadata.getName());
+        scale = columnTypeInfo.scale;
+        precision = columnTypeInfo.precision;
+      }
       primitiveType = 
this.parquetTableMetadata.getPrimitiveType(columnMetadata.getName());
       originalType = 
this.parquetTableMetadata.getOriginalType(columnMetadata.getName());
     } else {
@@ -372,8 +384,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan 
{
       originalType = columnMetadata.getOriginalType();
     }
     if (first) {
-      if (hasSingleValue(columnMetadata)) {
-        partitionColTypeMap.put(schemaPath, getType(primitiveType, 
originalType));
+      if (hasSingleValue(columnMetadata, rowCount)) {
+        partitionColTypeMap.put(schemaPath, getType(primitiveType, 
originalType, scale, precision));
         return true;
       } else {
         return false;
@@ -382,11 +394,11 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
       if (!partitionColTypeMap.keySet().contains(schemaPath)) {
         return false;
       } else {
-        if (!hasSingleValue(columnMetadata)) {
+        if (!hasSingleValue(columnMetadata, rowCount)) {
           partitionColTypeMap.remove(schemaPath);
           return false;
         }
-        if (!getType(primitiveType, 
originalType).equals(partitionColTypeMap.get(schemaPath))) {
+        if (!getType(primitiveType, originalType, scale, 
precision).equals(partitionColTypeMap.get(schemaPath))) {
           partitionColTypeMap.remove(schemaPath);
           return false;
         }
@@ -395,11 +407,21 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
     return true;
   }
 
-  public static MajorType getType(PrimitiveTypeName type, OriginalType 
originalType) {
+  /**
+   * Builds major type using given {@code OriginalType originalType} or {@code 
PrimitiveTypeName type}.
+   * For DECIMAL will be returned major type with scale and precision.
+   *
+   * @param type         parquet primitive type
+   * @param originalType parquet original type
+   * @param scale        type scale (used for DECIMAL type)
+   * @param precision    type precision (used for DECIMAL type)
+   * @return major type
+   */
+  public static MajorType getType(PrimitiveTypeName type, OriginalType 
originalType, int scale, int precision) {
     if (originalType != null) {
       switch (originalType) {
         case DECIMAL:
-          return Types.optional(MinorType.DECIMAL18);
+          return Types.withScaleAndPrecision(MinorType.DECIMAL18, 
TypeProtos.DataMode.OPTIONAL, scale, precision);
         case DATE:
           return Types.optional(MinorType.DATE);
         case TIME_MILLIS:
@@ -420,6 +442,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan 
{
           return Types.optional(MinorType.TINYINT);
         case INT_16:
           return Types.optional(MinorType.SMALLINT);
+        case INTERVAL:
+          return Types.optional(MinorType.INTERVAL);
       }
     }
 
@@ -444,10 +468,17 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
     }
   }
 
-  private boolean hasSingleValue(ColumnMetadata columnChunkMetaData) {
+  /**
+   * Checks that the column chunk has a single value.
+   *
+   * @param columnChunkMetaData metadata to check
+   * @param rowCount            rows count in column chunk
+   * @return true if column has single value
+   */
+  private boolean hasSingleValue(ColumnMetadata columnChunkMetaData, long 
rowCount) {
     // ColumnMetadata will have a non-null value iff the minValue and the 
maxValue for the
     // rowgroup are the same
-    return (columnChunkMetaData != null) && 
(columnChunkMetaData.hasSingleValue());
+    return (columnChunkMetaData != null) && 
(columnChunkMetaData.hasSingleValue(rowCount));
   }
 
   @Override public void modifyFileSelection(FileSelection selection) {
@@ -476,129 +507,233 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
 
   public void populatePruningVector(ValueVector v, int index, SchemaPath 
column, String file) {
     String f = Path.getPathWithoutSchemeAndAuthority(new 
Path(file)).toString();
-    MinorType type = getTypeForColumn(column).getMinorType();
+    MajorType majorType = getTypeForColumn(column);
+    MinorType type = majorType.getMinorType();
     switch (type) {
       case BIT: {
         NullableBitVector bitVector = (NullableBitVector) v;
         Boolean value = (Boolean) partitionValueMap.get(f).get(column);
-        bitVector.getMutator().setSafe(index, value ? 1 : 0);
+        if (value == null) {
+          bitVector.getMutator().setNull(index);
+        } else {
+          bitVector.getMutator().setSafe(index, value ? 1 : 0);
+        }
         return;
       }
       case INT: {
         NullableIntVector intVector = (NullableIntVector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        intVector.getMutator().setSafe(index, value);
+        if (value == null) {
+          intVector.getMutator().setNull(index);
+        } else {
+          intVector.getMutator().setSafe(index, value);
+        }
         return;
       }
       case SMALLINT: {
         NullableSmallIntVector smallIntVector = (NullableSmallIntVector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        smallIntVector.getMutator().setSafe(index, value.shortValue());
+        if (value == null) {
+          smallIntVector.getMutator().setNull(index);
+        } else {
+          smallIntVector.getMutator().setSafe(index, value.shortValue());
+        }
         return;
       }
       case TINYINT: {
         NullableTinyIntVector tinyIntVector = (NullableTinyIntVector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        tinyIntVector.getMutator().setSafe(index, value.byteValue());
+        if (value == null) {
+          tinyIntVector.getMutator().setNull(index);
+        } else {
+          tinyIntVector.getMutator().setSafe(index, value.byteValue());
+        }
         return;
       }
       case UINT1: {
         NullableUInt1Vector intVector = (NullableUInt1Vector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        intVector.getMutator().setSafe(index, value.byteValue());
+        if (value == null) {
+          intVector.getMutator().setNull(index);
+        } else {
+          intVector.getMutator().setSafe(index, value.byteValue());
+        }
         return;
       }
       case UINT2: {
         NullableUInt2Vector intVector = (NullableUInt2Vector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        intVector.getMutator().setSafe(index, (char) value.shortValue());
+        if (value == null) {
+          intVector.getMutator().setNull(index);
+        } else {
+          intVector.getMutator().setSafe(index, (char) value.shortValue());
+        }
         return;
       }
       case UINT4: {
         NullableUInt4Vector intVector = (NullableUInt4Vector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        intVector.getMutator().setSafe(index, value);
+        if (value == null) {
+          intVector.getMutator().setNull(index);
+        } else {
+          intVector.getMutator().setSafe(index, value);
+        }
         return;
       }
       case BIGINT: {
         NullableBigIntVector bigIntVector = (NullableBigIntVector) v;
         Long value = (Long) partitionValueMap.get(f).get(column);
-        bigIntVector.getMutator().setSafe(index, value);
+        if (value == null) {
+          bigIntVector.getMutator().setNull(index);
+        } else {
+          bigIntVector.getMutator().setSafe(index, value);
+        }
         return;
       }
       case FLOAT4: {
         NullableFloat4Vector float4Vector = (NullableFloat4Vector) v;
         Float value = (Float) partitionValueMap.get(f).get(column);
-        float4Vector.getMutator().setSafe(index, value);
+        if (value == null) {
+          float4Vector.getMutator().setNull(index);
+        } else {
+          float4Vector.getMutator().setSafe(index, value);
+        }
         return;
       }
       case FLOAT8: {
         NullableFloat8Vector float8Vector = (NullableFloat8Vector) v;
         Double value = (Double) partitionValueMap.get(f).get(column);
-        float8Vector.getMutator().setSafe(index, value);
+        if (value == null) {
+          float8Vector.getMutator().setNull(index);
+        } else {
+          float8Vector.getMutator().setSafe(index, value);
+        }
         return;
       }
       case VARBINARY: {
         NullableVarBinaryVector varBinaryVector = (NullableVarBinaryVector) v;
         Object s = partitionValueMap.get(f).get(column);
         byte[] bytes;
-        if (s instanceof Binary) {
-          bytes = ((Binary) s).getBytes();
-        } else if (s instanceof String) {
-          bytes = ((String) s).getBytes();
-        } else if (s instanceof byte[]) {
-          bytes = (byte[]) s;
+        if (s == null) {
+          varBinaryVector.getMutator().setNull(index);
+          return;
         } else {
-          throw new UnsupportedOperationException("Unable to create column 
data for type: " + type);
+          bytes = getBytes(type, s);
         }
         varBinaryVector.getMutator().setSafe(index, bytes, 0, bytes.length);
         return;
       }
       case DECIMAL18: {
         NullableDecimal18Vector decimalVector = (NullableDecimal18Vector) v;
-        Long value = (Long) partitionValueMap.get(f).get(column);
+        Object s = partitionValueMap.get(f).get(column);
+        byte[] bytes;
+        if (s == null) {
+          decimalVector.getMutator().setNull(index);
+          return;
+        } else if (s instanceof Integer) {
+          long value = DecimalUtility.getBigDecimalFromPrimitiveTypes(
+                          (Integer) s,
+                          majorType.getScale(),
+                          majorType.getPrecision()).longValue();
+          decimalVector.getMutator().setSafe(index, value);
+          return;
+        } else if (s instanceof Long) {
+          long value = DecimalUtility.getBigDecimalFromPrimitiveTypes(
+                          (Long) s,
+                          majorType.getScale(),
+                          majorType.getPrecision()).longValue();
+          decimalVector.getMutator().setSafe(index, value);
+          return;
+        } else {
+          bytes = getBytes(type, s);
+        }
+        long value = DecimalUtility.getBigDecimalFromByteArray(bytes, 0, 
bytes.length, majorType.getScale()).longValue();
         decimalVector.getMutator().setSafe(index, value);
         return;
       }
       case DATE: {
         NullableDateVector dateVector = (NullableDateVector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        dateVector.getMutator().setSafe(index, value * (long) 
DateTimeConstants.MILLIS_PER_DAY);
+        if (value == null) {
+          dateVector.getMutator().setNull(index);
+        } else {
+          dateVector.getMutator().setSafe(index, value * (long) 
DateTimeConstants.MILLIS_PER_DAY);
+        }
         return;
       }
       case TIME: {
         NullableTimeVector timeVector = (NullableTimeVector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        timeVector.getMutator().setSafe(index, value);
+        if (value == null) {
+          timeVector.getMutator().setNull(index);
+        } else {
+          timeVector.getMutator().setSafe(index, value);
+        }
         return;
       }
       case TIMESTAMP: {
         NullableTimeStampVector timeStampVector = (NullableTimeStampVector) v;
         Long value = (Long) partitionValueMap.get(f).get(column);
-        timeStampVector.getMutator().setSafe(index, value);
+        if (value == null) {
+          timeStampVector.getMutator().setNull(index);
+        } else {
+          timeStampVector.getMutator().setSafe(index, value);
+        }
         return;
       }
       case VARCHAR: {
         NullableVarCharVector varCharVector = (NullableVarCharVector) v;
         Object s = partitionValueMap.get(f).get(column);
         byte[] bytes;
-        if (s instanceof String) { // if the metadata was read from a JSON 
cache file it maybe a string type
-          bytes = ((String) s).getBytes();
-        } else if (s instanceof Binary) {
-          bytes = ((Binary) s).getBytes();
-        } else if (s instanceof byte[]) {
-          bytes = (byte[]) s;
+        if (s == null) {
+          varCharVector.getMutator().setNull(index);
+          return;
         } else {
-          throw new UnsupportedOperationException("Unable to create column 
data for type: " + type);
+          bytes = getBytes(type, s);
         }
         varCharVector.getMutator().setSafe(index, bytes, 0, bytes.length);
         return;
       }
+      case INTERVAL: {
+        NullableIntervalVector intervalVector = (NullableIntervalVector) v;
+        Object s = partitionValueMap.get(f).get(column);
+        byte[] bytes;
+        if (s == null) {
+          intervalVector.getMutator().setNull(index);
+          return;
+        } else {
+          bytes = getBytes(type, s);
+        }
+        intervalVector.getMutator().setSafe(index, 1,
+          ParquetReaderUtility.getIntFromLEBytes(bytes, 0),
+          ParquetReaderUtility.getIntFromLEBytes(bytes, 4),
+          ParquetReaderUtility.getIntFromLEBytes(bytes, 8));
+        return;
+      }
       default:
         throw new UnsupportedOperationException("Unsupported type: " + type);
     }
   }
 
+  /**
+   * Returns the sequence of bytes received from {@code Object source}.
+   *
+   * @param type   the column type
+   * @param source the source of the bytes sequence
+   * @return bytes sequence obtained from {@code Object source}
+   */
+  private byte[] getBytes(MinorType type, Object source) {
+    byte[] bytes;
+    if (source instanceof Binary) {
+      bytes = ((Binary) source).getBytes();
+    } else if (source instanceof byte[]) {
+      bytes = (byte[]) source;
+    } else {
+      throw new UnsupportedOperationException("Unable to create column data 
for type: " + type);
+    }
+    return bytes;
+  }
+
   public static class RowGroupInfo extends ReadEntryFromHDFS implements 
CompleteWork, FileWork {
 
     private EndpointByteMap byteMap;
@@ -684,6 +819,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan 
{
     if (formatConfig.areCorruptDatesAutoCorrected()) {
       
ParquetReaderUtility.correctDatesInMetadataCache(this.parquetTableMetadata);
     }
+    ParquetReaderUtility.correctBinaryInMetadataCache(parquetTableMetadata);
     List<FileStatus> fileStatuses = selection.getStatuses(fs);
 
     if (fileSet == null) {
@@ -857,7 +993,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan 
{
               columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS);
             }
           }
-          boolean partitionColumn = checkForPartitionColumn(column, first);
+          boolean partitionColumn = checkForPartitionColumn(column, first, 
rowCount);
           if (partitionColumn) {
             Map<SchemaPath, Object> map = 
partitionValueMap.get(file.getPath());
             if (map == null) {
@@ -871,7 +1007,13 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
                 partitionColTypeMap.remove(schemaPath);
               }
             } else {
-              map.put(schemaPath, currentValue);
+              // the value of a column with primitive type can not be null,
+              // so checks that there are really null value and puts it to the 
map
+              if (rowCount == column.getNulls()) {
+                map.put(schemaPath, null);
+              } else {
+                map.put(schemaPath, currentValue);
+              }
             }
           } else {
             partitionColTypeMap.remove(schemaPath);

http://git-wip-us.apache.org/repos/asf/drill/blob/5df49ab9/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 84e969a..c2580cb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import com.google.common.collect.Sets;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
@@ -38,6 +40,7 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.joda.time.Chronology;
 import org.joda.time.DateTimeConstants;
 import org.apache.parquet.example.data.simple.NanoTime;
@@ -48,6 +51,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Utility class where we can capture common logic between the two parquet 
readers
@@ -157,11 +161,12 @@ public class ParquetReaderUtility {
         // Drill has only ever written a single row group per file, only need 
to correct the statistics
         // on the first row group
         Metadata.RowGroupMetadata rowGroupMetadata = 
file.getRowGroups().get(0);
+        Long rowCount = rowGroupMetadata.getRowCount();
         for (Metadata.ColumnMetadata columnMetadata : 
rowGroupMetadata.getColumns()) {
           // Setting Min/Max values for ParquetTableMetadata_v1
           if (new MetadataVersion(1, 0).equals(new 
MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
             OriginalType originalType = columnMetadata.getOriginalType();
-            if (OriginalType.DATE.equals(originalType) && 
columnMetadata.hasSingleValue() &&
+            if (OriginalType.DATE.equals(originalType) && 
columnMetadata.hasSingleValue(rowCount) &&
                 (Integer) columnMetadata.getMaxValue() > 
ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
               int newMinMax = 
ParquetReaderUtility.autoCorrectCorruptedDate((Integer) 
columnMetadata.getMaxValue());
               columnMetadata.setMax(newMinMax);
@@ -171,7 +176,7 @@ public class ParquetReaderUtility {
           // Setting Max values for ParquetTableMetadata_v2
           else if (new MetadataVersion(2, 0).equals(new 
MetadataVersion(parquetTableMetadata.getMetadataVersion())) &&
               columnMetadata.getName() != null && 
Arrays.equals(columnMetadata.getName(), names) &&
-              columnMetadata.hasSingleValue() && (Integer) 
columnMetadata.getMaxValue() >
+              columnMetadata.hasSingleValue(rowCount) && (Integer) 
columnMetadata.getMaxValue() >
               ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
             int newMax = 
ParquetReaderUtility.autoCorrectCorruptedDate((Integer) 
columnMetadata.getMaxValue());
             columnMetadata.setMax(newMax);
@@ -182,6 +187,109 @@ public class ParquetReaderUtility {
   }
 
   /**
+   * Checks assigns byte arrays to min/max values obtained from the 
deserialized string
+   * for BINARY.
+   *
+   * @param parquetTableMetadata table metadata that should be corrected
+   */
+  public static void 
correctBinaryInMetadataCache(Metadata.ParquetTableMetadataBase 
parquetTableMetadata) {
+    // Looking for the names of the columns with BINARY data type
+    // in the metadata cache file for V2 and all v3 versions
+    Set<List<String>> columnsNames = 
getBinaryColumnsNames(parquetTableMetadata);
+
+    for (Metadata.ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
+      for (Metadata.RowGroupMetadata rowGroupMetadata : file.getRowGroups()) {
+        Long rowCount = rowGroupMetadata.getRowCount();
+        for (Metadata.ColumnMetadata columnMetadata : 
rowGroupMetadata.getColumns()) {
+          // Setting Min/Max values for ParquetTableMetadata_v1
+          if (new MetadataVersion(1, 0).equals(new 
MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
+            if (columnMetadata.getPrimitiveType() == PrimitiveTypeName.BINARY
+                || columnMetadata.getPrimitiveType() == 
PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+              setMinMaxValues(columnMetadata, rowCount);
+            }
+          }
+          // Setting Min/Max values for V2 and all V3 versions before V3_3
+          else if (new 
MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new 
MetadataVersion(3, 3)) < 0
+                    && 
columnsNames.contains(Arrays.asList(columnMetadata.getName()))) {
+            setMinMaxValues(columnMetadata, rowCount);
+          }
+          // Setting Min/Max values for V3_3 and all younger versions
+          else if (new 
MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new 
MetadataVersion(3, 3)) >= 0
+                      && 
columnsNames.contains(Arrays.asList(columnMetadata.getName()))) {
+            convertMinMaxValues(columnMetadata, rowCount);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns the set of the lists with names of the columns with BINARY or
+   * FIXED_LEN_BYTE_ARRAY data type from {@code ParquetTableMetadataBase 
columnTypeMetadataCollection}
+   * if parquetTableMetadata has version v2 or v3 (including minor versions).
+   *
+   * @param parquetTableMetadata table metadata the source of the columns to 
check
+   * @return set of the lists with column names
+   */
+  private static Set<List<String>> 
getBinaryColumnsNames(Metadata.ParquetTableMetadataBase parquetTableMetadata) {
+    Set<List<String>> names = Sets.newHashSet();
+    if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v2) {
+      for (Metadata.ColumnTypeMetadata_v2 columnTypeMetadata :
+        ((Metadata.ParquetTableMetadata_v2) 
parquetTableMetadata).columnTypeInfo.values()) {
+        if (columnTypeMetadata.primitiveType == PrimitiveTypeName.BINARY
+            || columnTypeMetadata.primitiveType == 
PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+          names.add(Arrays.asList(columnTypeMetadata.name));
+        }
+      }
+    } else if (parquetTableMetadata instanceof 
Metadata.ParquetTableMetadata_v3) {
+      for (Metadata.ColumnTypeMetadata_v3 columnTypeMetadata :
+        ((Metadata.ParquetTableMetadata_v3) 
parquetTableMetadata).columnTypeInfo.values()) {
+        if (columnTypeMetadata.primitiveType == PrimitiveTypeName.BINARY
+            || columnTypeMetadata.primitiveType == 
PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+          names.add(Arrays.asList(columnTypeMetadata.name));
+        }
+      }
+    }
+    return names;
+  }
+
+  /**
+   * Checks that column has single value and replaces Min and Max by their 
byte values
+   * in {@code Metadata.ColumnMetadata columnMetadata} if their values were 
stored as strings.
+   *
+   * @param columnMetadata column metadata that should be changed
+   * @param rowCount       rows count in column chunk
+   */
+  private static void setMinMaxValues(Metadata.ColumnMetadata columnMetadata, 
long rowCount) {
+    if (columnMetadata.hasSingleValue(rowCount)) {
+      Object minValue = columnMetadata.getMinValue();
+      if (minValue != null && minValue instanceof String) {
+        byte[] bytes = ((String) minValue).getBytes();
+        columnMetadata.setMax(bytes);
+        columnMetadata.setMin(bytes);
+      }
+    }
+  }
+
+  /**
+   * Checks that column has single value and replaces Min and Max by their 
byte values from Base64 data
+   * in Metadata.ColumnMetadata columnMetadata if their values were stored as 
strings.
+   *
+   * @param columnMetadata column metadata that should be changed
+   * @param rowCount       rows count in column chunk
+   */
+  private static void convertMinMaxValues(Metadata.ColumnMetadata 
columnMetadata, long rowCount) {
+    if (columnMetadata.hasSingleValue(rowCount)) {
+      Object minValue = columnMetadata.getMinValue();
+      if (minValue != null && minValue instanceof String) {
+        byte[] bytes = Base64.decodeBase64(((String) minValue).getBytes());
+        columnMetadata.setMax(bytes);
+        columnMetadata.setMin(bytes);
+      }
+    }
+  }
+
+  /**
    * Check for corrupted dates in a parquet file. See Drill-4203
    */
   public static DateCorruptionStatus detectCorruptDates(ParquetMetadata footer,

http://git-wip-us.apache.org/repos/asf/drill/blob/5df49ab9/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
index d86f863..4501cb8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -95,8 +95,17 @@ public class ParquetMetaStatCollector implements  
ColumnStatCollector{
         primitiveType = 
this.parquetTableMetadata.getPrimitiveType(columnMetadata.getName());
         originalType = 
this.parquetTableMetadata.getOriginalType(columnMetadata.getName());
         final Integer repetitionLevel = 
this.parquetTableMetadata.getRepetitionLevel(columnMetadata.getName());
+        int precision = 0;
+        int scale = 0;
+        // ColumnTypeMetadata_v3 stores information about scale and precision
+        if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v3) {
+          Metadata.ColumnTypeMetadata_v3 columnTypeInfo = 
((Metadata.ParquetTableMetadata_v3) parquetTableMetadata)
+                                                                          
.getColumnTypeInfo(columnMetadata.getName());
+          scale = columnTypeInfo.scale;
+          precision = columnTypeInfo.precision;
+        }
 
-        statMap.put(schemaPath, getStat(min, max, numNull, primitiveType, 
originalType, repetitionLevel));
+        statMap.put(schemaPath, getStat(min, max, numNull, primitiveType, 
originalType, repetitionLevel, scale, precision));
       } else {
         final String columnName = schemaPath.getRootSegment().getPath();
         if (implicitColValues.containsKey(columnName)) {
@@ -117,20 +126,35 @@ public class ParquetMetaStatCollector implements  
ColumnStatCollector{
     return statMap;
   }
 
+  /**
+   * Builds column statistics using given primitiveType, originalType, 
repetitionLevel, scale,
+   * precision, numNull, min and max values.
+   *
+   * @param min             min value for statistics
+   * @param max             max value for statistics
+   * @param numNull         num_nulls for statistics
+   * @param primitiveType   type that determines statistics class
+   * @param originalType    type that determines statistics class
+   * @param repetitionLevel field repetition level
+   * @param scale           scale value (used for DECIMAL type)
+   * @param precision       precision value (used for DECIMAL type)
+   * @return column statistics
+   */
   private ColumnStatistics getStat(Object min, Object max, Long numNull,
-      PrimitiveType.PrimitiveTypeName primitiveType, OriginalType 
originalType, Integer repetitionLevel) {
+                                   PrimitiveType.PrimitiveTypeName 
primitiveType, OriginalType originalType,
+                                   Integer repetitionLevel, int scale, int 
precision) {
     Statistics stat = Statistics.getStatsBasedOnType(primitiveType);
     Statistics convertedStat = stat;
 
-    TypeProtos.MajorType type = ParquetGroupScan.getType(primitiveType, 
originalType);
+    TypeProtos.MajorType type = ParquetGroupScan.getType(primitiveType, 
originalType, scale, precision);
 
     // Change to repeated if repetitionLevel > 0
     if (repetitionLevel != null && repetitionLevel > 0) {
-      type = 
TypeProtos.MajorType.newBuilder().setMinorType(type.getMinorType()).setMode(TypeProtos.DataMode.REPEATED).build();
+      type = Types.withScaleAndPrecision(type.getMinorType(), 
TypeProtos.DataMode.REPEATED, scale, precision);
     }
 
     if (numNull != null) {
-      stat.setNumNulls(numNull.longValue());
+      stat.setNumNulls(numNull);
     }
 
     if (min != null && max != null ) {

http://git-wip-us.apache.org/repos/asf/drill/blob/5df49ab9/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index 7578476..ca3328e 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -21,10 +21,12 @@ import com.google.common.io.Resources;
 import mockit.Mock;
 import mockit.MockUp;
 import mockit.integration.junit4.JMockit;
+import com.google.common.collect.Lists;
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.common.util.TestTools;
 import org.apache.commons.io.FileUtils;
 import org.apache.drill.exec.store.dfs.MetadataContext;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.hadoop.fs.Path;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -714,6 +716,220 @@ public class TestParquetMetadataCache extends 
PlanTestBase {
     }
   }
 
+  @Test // DRILL-4139
+  public void testBooleanPartitionPruning() throws Exception {
+    final String boolPartitionTable = "dfs_test.tmp.`interval_bool_partition`";
+    try {
+      test("create table %s partition by (col_bln) as " +
+        "select * from cp.`parquet/alltypes_required.parquet`", 
boolPartitionTable);
+
+      String query = String.format("select * from %s where col_bln = true", 
boolPartitionTable);
+      int expectedRowCount = 2;
+
+      int actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 
expectedRowCount, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+      test("refresh table metadata %s", boolPartitionTable);
+
+      actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 
expectedRowCount, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+    } finally {
+      test("drop table if exists %s", boolPartitionTable);
+    }
+  }
+
+  @Test // DRILL-4139
+  public void testIntervalDayPartitionPruning() throws Exception {
+    final String intervalDayPartitionTable = 
"dfs_test.tmp.`interval_day_partition`";
+    try {
+      test("create table %s partition by (col_intrvl_day) as " +
+        "select * from cp.`parquet/alltypes_optional.parquet`", 
intervalDayPartitionTable);
+
+      String query = String.format("select * from %s " +
+        "where col_intrvl_day = cast('P26DT27386S' as interval day)", 
intervalDayPartitionTable);
+      int expectedRowCount = 1;
+
+      int actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 
expectedRowCount, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+      test("refresh table metadata %s", intervalDayPartitionTable);
+
+      actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 
expectedRowCount, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+    } finally {
+      test(String.format("drop table if exists %s", 
intervalDayPartitionTable));
+    }
+  }
+
+  @Test // DRILL-4139
+  public void testIntervalYearPartitionPruning() throws Exception {
+    final String intervalYearPartitionTable = 
"dfs_test.tmp.`interval_yr_partition`";
+    try {
+      test("create table %s partition by (col_intrvl_yr) as " +
+        "select * from cp.`parquet/alltypes_optional.parquet`", 
intervalYearPartitionTable);
+
+      String query = String.format("select * from %s where col_intrvl_yr = 
cast('P314M' as interval year)",
+        intervalYearPartitionTable);
+      int expectedRowCount = 1;
+
+      int actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 
expectedRowCount, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+      test("refresh table metadata %s", intervalYearPartitionTable);
+
+      actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 
expectedRowCount, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+    } finally {
+      test("drop table if exists %s", intervalYearPartitionTable);
+    }
+  }
+
+  @Test // DRILL-4139
+  public void testVarCharWithNullsPartitionPruning() throws Exception {
+    final String intervalYearPartitionTable = 
"dfs_test.tmp.`varchar_optional_partition`";
+    try {
+      test("create table %s partition by (col_vrchr) as " +
+        "select * from cp.`parquet/alltypes_optional.parquet`", 
intervalYearPartitionTable);
+
+      String query = String.format("select * from %s where col_vrchr = 'Nancy 
Cloke'",
+        intervalYearPartitionTable);
+      int expectedRowCount = 1;
+
+      int actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 
expectedRowCount, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+      test("refresh table metadata %s", intervalYearPartitionTable);
+
+      actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 
expectedRowCount, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+    } finally {
+      test("drop table if exists %s", intervalYearPartitionTable);
+    }
+  }
+
+  @Test // DRILL-4139
+  public void testDecimalPartitionPruning() throws Exception {
+    List<String> ctasQueries = Lists.newArrayList();
+    // decimal stores as fixed_len_byte_array
+    ctasQueries.add("create table %s partition by (manager_id) as " +
+      "select * from cp.`parquet/fixedlenDecimal.parquet`");
+    // decimal stores as int32
+    ctasQueries.add("create table %s partition by (manager_id) as " +
+      "select cast(manager_id as decimal(6, 0)) as manager_id, EMPLOYEE_ID, 
FIRST_NAME, LAST_NAME " +
+      "from cp.`parquet/fixedlenDecimal.parquet`");
+    // decimal stores as int64
+    ctasQueries.add("create table %s partition by (manager_id) as " +
+      "select cast(manager_id as decimal(18, 6)) as manager_id, EMPLOYEE_ID, 
FIRST_NAME, LAST_NAME " +
+      "from cp.`parquet/fixedlenDecimal.parquet`");
+    final String decimalPartitionTable = 
"dfs_test.tmp.`decimal_optional_partition`";
+    for (String ctasQuery : ctasQueries) {
+      try {
+        test("alter session set `%s` = true", 
PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
+        test(ctasQuery, decimalPartitionTable);
+
+        String query = String.format("select * from %s where manager_id = 
148", decimalPartitionTable);
+        int expectedRowCount = 6;
+
+        int actualRowCount = testSql(query);
+        assertEquals("Row count does not match the expected value", 
expectedRowCount, actualRowCount);
+        PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+        test("refresh table metadata %s", decimalPartitionTable);
+
+        actualRowCount = testSql(query);
+        assertEquals("Row count does not match the expected value", 
expectedRowCount, actualRowCount);
+        PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+      } finally {
+        test("drop table if exists %s", decimalPartitionTable);
+        test("alter session set `%s` = false", 
PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
+      }
+    }
+  }
+
+  @Test // DRILL-4139
+  public void testIntWithNullsPartitionPruning() throws Exception {
+    try {
+      test("create table dfs_test.tmp.`t5/a` as\n" +
+        "select 100 as mykey from cp.`tpch/nation.parquet`\n" +
+        "union all\n" +
+        "select col_notexist from cp.`tpch/region.parquet`");
+
+      test("create table dfs_test.tmp.`t5/b` as\n" +
+        "select 200 as mykey from cp.`tpch/nation.parquet`\n" +
+        "union all\n" +
+        "select col_notexist from cp.`tpch/region.parquet`");
+
+      String query = "select mykey from dfs_test.tmp.`t5` where mykey = 100";
+      int actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 25, 
actualRowCount);
+
+      test("refresh table metadata dfs_test.tmp.`t5`");
+
+      actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 25, 
actualRowCount);
+    } finally {
+      test("drop table if exists dfs_test.tmp.`t5`");
+    }
+  }
+
+  @Test // DRILL-4139
+  public void testPartitionPruningWithIsNull() throws Exception {
+    try {
+      test("create table dfs_test.tmp.`t6/a` as\n" +
+        "select col_notexist as mykey from cp.`tpch/region.parquet`");
+
+      test("create table dfs_test.tmp.`t6/b` as\n" +
+        "select 100 as mykey from cp.`tpch/region.parquet`");
+
+      String query = "select mykey from dfs_test.tmp.t6 where mykey is null";
+
+      int actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 5, 
actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+      test("refresh table metadata dfs_test.tmp.`t6`");
+
+      actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 5, 
actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+    } finally {
+      test("drop table if exists dfs_test.tmp.`t6`");
+    }
+  }
+
+  @Test // DRILL-4139
+  public void testPartitionPruningWithIsNotNull() throws Exception {
+    try {
+      test("create table dfs_test.tmp.`t7/a` as\n" +
+        "select col_notexist as mykey from cp.`tpch/region.parquet`");
+
+      test("create table dfs_test.tmp.`t7/b` as\n" +
+        "select 100 as mykey from cp.`tpch/region.parquet`");
+
+      String query = "select mykey from dfs_test.tmp.t7 where mykey is null";
+
+      int actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 5, 
actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+      test("refresh table metadata dfs_test.tmp.`t7`");
+
+      actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 5, 
actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+    } finally {
+      test("drop table if exists dfs_test.tmp.`t7`");
+    }
+  }
+
   /**
    * Helper method for checking the metadata file existence
    *
@@ -726,5 +942,4 @@ public class TestParquetMetadataCache extends PlanTestBase {
     assertTrue(String.format("There is no metadata cache file for the %s 
table", table),
         Files.exists(metaFile.toPath()));
   }
-
 }

Reply via email to