DRILL-5795: Parquet Filter push down now work at rowgroup level

Before this commit, the filter was pruning complete files. When a file
is composed of multiple rowgroups, it was not able to prune one
rowgroup from the file. Now, when the filter find that a rowgroup
doesn't match it will be remove from the scan.

closes #949


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3036d370
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3036d370
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3036d370

Branch: refs/heads/master
Commit: 3036d3700aa620bbbffc260e52f633cdaae1172c
Parents: 30da051
Author: Damien Profeta <damien.prof...@amadeus.com>
Authored: Fri Sep 15 11:01:58 2017 -0700
Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com>
Committed: Mon Nov 13 11:45:21 2017 +0200

----------------------------------------------------------------------
 .../exec/store/parquet/ParquetGroupScan.java    | 171 ++++++++++---------
 .../parquet/TestParquetFilterPushDown.java      |  10 ++
 .../resources/parquet/multirowgroup.parquet     | Bin 0 -> 398 bytes
 3 files changed, 103 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/3036d370/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 4e38ce9..972332c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -738,7 +738,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan 
{
 
     private EndpointByteMap byteMap;
     private int rowGroupIndex;
-    private String root;
+    private List<? extends ColumnMetadata> columns;
     private long rowCount;  // rowCount = -1 indicates to include all rows.
     private long numRecordsToRead;
 
@@ -791,6 +791,14 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
       return rowCount;
     }
 
+    public List<? extends ColumnMetadata> getColumns() {
+      return columns;
+    }
+
+    public void setColumns(List<? extends ColumnMetadata> columns) {
+      this.columns = columns;
+    }
+
   }
 
   /**
@@ -962,69 +970,70 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
           }
         }
         rowGroupInfo.setEndpointByteMap(endpointByteMap);
+        rowGroupInfo.setColumns(rg.getColumns());
         rgIndex++;
         rowGroupInfos.add(rowGroupInfo);
       }
     }
 
     this.endpointAffinities = AffinityCreator.getAffinityMap(rowGroupInfos);
+    updatePartitionColTypeMap();
+  }
 
+  private void updatePartitionColTypeMap() {
     columnValueCounts = Maps.newHashMap();
     this.rowCount = 0;
     boolean first = true;
-    for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
-      for (RowGroupMetadata rowGroup : file.getRowGroups()) {
-        long rowCount = rowGroup.getRowCount();
-        for (ColumnMetadata column : rowGroup.getColumns()) {
-          SchemaPath schemaPath = SchemaPath.getCompoundPath(column.getName());
-          Long previousCount = columnValueCounts.get(schemaPath);
-          if (previousCount != null) {
-            if (previousCount != GroupScan.NO_COLUMN_STATS) {
-              if (column.getNulls() != null) {
-                Long newCount = rowCount - column.getNulls();
-                columnValueCounts.put(schemaPath, 
columnValueCounts.get(schemaPath) + newCount);
-              }
-            }
-          } else {
+    for (RowGroupInfo rowGroup : this.rowGroupInfos) {
+      long rowCount = rowGroup.getRowCount();
+      for (ColumnMetadata column : rowGroup.getColumns()) {
+        SchemaPath schemaPath = SchemaPath.getCompoundPath(column.getName());
+        Long previousCount = columnValueCounts.get(schemaPath);
+        if (previousCount != null) {
+          if (previousCount != GroupScan.NO_COLUMN_STATS) {
             if (column.getNulls() != null) {
               Long newCount = rowCount - column.getNulls();
-              columnValueCounts.put(schemaPath, newCount);
-            } else {
-              columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS);
+              columnValueCounts.put(schemaPath, 
columnValueCounts.get(schemaPath) + newCount);
             }
           }
-          boolean partitionColumn = checkForPartitionColumn(column, first, 
rowCount);
-          if (partitionColumn) {
-            Map<SchemaPath, Object> map = 
partitionValueMap.get(file.getPath());
-            if (map == null) {
-              map = Maps.newHashMap();
-              partitionValueMap.put(file.getPath(), map);
+        } else {
+          if (column.getNulls() != null) {
+            Long newCount = rowCount - column.getNulls();
+            columnValueCounts.put(schemaPath, newCount);
+          } else {
+            columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS);
+          }
+        }
+        boolean partitionColumn = checkForPartitionColumn(column, first, 
rowCount);
+        if (partitionColumn) {
+          Map<SchemaPath, Object> map = 
partitionValueMap.get(rowGroup.getPath());
+          if (map == null) {
+            map = Maps.newHashMap();
+            partitionValueMap.put(rowGroup.getPath(), map);
+          }
+          Object value = map.get(schemaPath);
+          Object currentValue = column.getMaxValue();
+          if (value != null) {
+            if (value != currentValue) {
+              partitionColTypeMap.remove(schemaPath);
             }
-            Object value = map.get(schemaPath);
-            Object currentValue = column.getMaxValue();
-            if (value != null) {
-              if (value != currentValue) {
-                partitionColTypeMap.remove(schemaPath);
-              }
+          } else {
+            // the value of a column with primitive type can not be null,
+            // so checks that there are really null value and puts it to the 
map
+            if (rowCount == column.getNulls()) {
+              map.put(schemaPath, null);
             } else {
-              // the value of a column with primitive type can not be null,
-              // so checks that there are really null value and puts it to the 
map
-              if (rowCount == column.getNulls()) {
-                map.put(schemaPath, null);
-              } else {
-                map.put(schemaPath, currentValue);
-              }
+              map.put(schemaPath, currentValue);
             }
-          } else {
-            partitionColTypeMap.remove(schemaPath);
           }
+        } else {
+          partitionColTypeMap.remove(schemaPath);
         }
-        this.rowCount += rowGroup.getRowCount();
-        first = false;
       }
+      this.rowCount += rowGroup.getRowCount();
+      first = false;
     }
   }
-
   private ParquetTableMetadataBase 
removeUnneededRowGroups(ParquetTableMetadataBase parquetTableMetadata) {
     List<ParquetFileMetadata> newFileMetadataList = Lists.newArrayList();
     for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
@@ -1121,6 +1130,7 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
     return "ParquetGroupScan [entries=" + entries
         + ", selectionRoot=" + selectionRoot
         + ", numFiles=" + getEntries().size()
+        + ", numRowGroups=" + rowGroupInfos.size()
         + ", usedMetadataFile=" + usedMetadataCache
         + filterStr
         + cacheFileString
@@ -1231,7 +1241,7 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
 
   public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities 
udfUtilities,
       FunctionImplementationRegistry functionImplementationRegistry, 
OptionManager optionManager) {
-    if (fileSet.size() == 1 ||
+    if (rowGroupInfos.size() == 1 ||
         ! (parquetTableMetadata.isRowGroupPrunable()) ||
         rowGroupInfos.size() > 
optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)
         ) {
@@ -1244,66 +1254,71 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
 
     final Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new 
ParquetRGFilterEvaluator.FieldReferenceFinder(), null);
 
-    final List<RowGroupMetadata> qualifiedRGs = new 
ArrayList<>(parquetTableMetadata.getFiles().size());
+    final List<RowGroupInfo> qualifiedRGs = new 
ArrayList<>(rowGroupInfos.size());
     Set<String> qualifiedFileNames = Sets.newHashSet(); // HashSet keeps a 
fileName unique.
 
     ParquetFilterPredicate filterPredicate = null;
 
-    for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
+    for (RowGroupInfo rowGroup : rowGroupInfos) {
       final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, 
this.columns);
-      Map<String, String> implicitColValues = 
columnExplorer.populateImplicitColumns(file.getPath(), selectionRoot);
+      Map<String, String> implicitColValues = 
columnExplorer.populateImplicitColumns(rowGroup.getPath(), selectionRoot);
 
-      for (RowGroupMetadata rowGroup : file.getRowGroups()) {
-        ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector(
-            parquetTableMetadata,
-            rowGroup.getColumns(),
-            implicitColValues);
+      ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector(
+              parquetTableMetadata,
+              rowGroup.getColumns(),
+              implicitColValues);
 
-        Map<SchemaPath, ColumnStatistics> columnStatisticsMap = 
statCollector.collectColStat(schemaPathsInExpr);
+      Map<SchemaPath, ColumnStatistics> columnStatisticsMap = 
statCollector.collectColStat(schemaPathsInExpr);
 
-        if (filterPredicate == null) {
-          ErrorCollector errorCollector = new ErrorCollectorImpl();
-          LogicalExpression materializedFilter = 
ExpressionTreeMaterializer.materializeFilterExpr(
-              filterExpr, columnStatisticsMap, errorCollector, 
functionImplementationRegistry);
-
-          if (errorCollector.hasErrors()) {
-            logger.error("{} error(s) encountered when materialize filter 
expression : {}",
-                errorCollector.getErrorCount(), 
errorCollector.toErrorString());
-            return null;
-          }
-          //    logger.debug("materializedFilter : {}", 
ExpressionStringBuilder.toString(materializedFilter));
+      if (filterPredicate == null) {
+        ErrorCollector errorCollector = new ErrorCollectorImpl();
+        LogicalExpression materializedFilter = 
ExpressionTreeMaterializer.materializeFilterExpr(
+                filterExpr, columnStatisticsMap, errorCollector, 
functionImplementationRegistry);
 
-          Set<LogicalExpression> constantBoundaries = 
ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
-          filterPredicate = (ParquetFilterPredicate) 
ParquetFilterBuilder.buildParquetFilterPredicate(
-              materializedFilter, constantBoundaries, udfUtilities);
-
-          if (filterPredicate == null) {
-            return null;
-          }
+        if (errorCollector.hasErrors()) {
+          logger.error("{} error(s) encountered when materialize filter 
expression : {}",
+                  errorCollector.getErrorCount(), 
errorCollector.toErrorString());
+          return null;
         }
+        //    logger.debug("materializedFilter : {}", 
ExpressionStringBuilder.toString(materializedFilter));
 
-        if (ParquetRGFilterEvaluator.canDrop(filterPredicate, 
columnStatisticsMap, rowGroup.getRowCount())) {
-          continue;
+        Set<LogicalExpression> constantBoundaries = 
ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
+        filterPredicate = (ParquetFilterPredicate) 
ParquetFilterBuilder.buildParquetFilterPredicate(
+                materializedFilter, constantBoundaries, udfUtilities);
+
+        if (filterPredicate == null) {
+          return null;
         }
+      }
 
-        qualifiedRGs.add(rowGroup);
-        qualifiedFileNames.add(file.getPath());  // TODO : optimize when 1 
file contains m row groups.
+      if (ParquetRGFilterEvaluator.canDrop(filterPredicate, 
columnStatisticsMap, rowGroup.getRowCount())) {
+        continue;
       }
+
+      qualifiedRGs.add(rowGroup);
+      qualifiedFileNames.add(rowGroup.getPath());  // TODO : optimize when 1 
file contains m row groups.
     }
 
-    if (qualifiedFileNames.size() == fileSet.size() ) {
+
+    if (qualifiedRGs.size() == rowGroupInfos.size() ) {
       // There is no reduction of rowGroups. Return the original groupScan.
       logger.debug("applyFilter does not have any pruning!");
       return null;
     } else if (qualifiedFileNames.size() == 0) {
       logger.warn("All rowgroups have been filtered out. Add back one to get 
schema from scannner");
-      qualifiedFileNames.add(fileSet.iterator().next());
+      RowGroupInfo rg = rowGroupInfos.iterator().next();
+      qualifiedFileNames.add(rg.getPath());
+      qualifiedRGs.add(rg);
     }
 
     try {
       FileSelection newSelection = new FileSelection(null, 
Lists.newArrayList(qualifiedFileNames), getSelectionRoot(), cacheFileRoot, 
false);
-      logger.info("applyFilter {} reduce parquet file # from {} to {}", 
ExpressionStringBuilder.toString(filterExpr), fileSet.size(), 
qualifiedFileNames.size());
-      return this.clone(newSelection);
+      logger.info("applyFilter {} reduce parquet rowgroup # from {} to {}", 
ExpressionStringBuilder.toString(filterExpr), rowGroupInfos.size(), 
qualifiedRGs.size());
+      ParquetGroupScan clonegroupscan = this.clone(newSelection);
+      clonegroupscan.rowGroupInfos = qualifiedRGs;
+      clonegroupscan.updatePartitionColTypeMap();
+      return clonegroupscan;
+
     } catch (IOException e) {
       logger.warn("Could not apply filter prune due to Exception : {}", e);
       return null;

http://git-wip-us.apache.org/repos/asf/drill/blob/3036d370/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
index fa5c8b2..8f56c45 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
@@ -384,6 +384,16 @@ public class TestParquetFilterPushDown extends 
PlanTestBase {
 
   }
 
+  @Test
+  public void testMultiRowGroup() throws Exception {
+    // multirowgroup is a parquet file with 2 rowgroups inside. One with a = 1 
and the other with a = 2;
+    // FilterPushDown should be able to remove the rowgroup with a = 1 from 
the scan operator.
+    final String sql = String.format("select * from 
dfs_test.`%s/parquet/multirowgroup.parquet` where a > 1", TEST_RES_PATH);
+    final String[] expectedPlan = {"numRowGroups=1"};
+    final String[] excludedPlan = {};
+    PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
+  }
+
   
//////////////////////////////////////////////////////////////////////////////////////////////////
   // Some test helper functions.
   
//////////////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/drill/blob/3036d370/exec/java-exec/src/test/resources/parquet/multirowgroup.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/multirowgroup.parquet 
b/exec/java-exec/src/test/resources/parquet/multirowgroup.parquet
new file mode 100644
index 0000000..1cb5551
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/parquet/multirowgroup.parquet differ

Reply via email to