DRILL-5795: Parquet Filter push down now work at rowgroup level Before this commit, the filter was pruning complete files. When a file is composed of multiple rowgroups, it was not able to prune one rowgroup from the file. Now, when the filter find that a rowgroup doesn't match it will be remove from the scan.
closes #949 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3036d370 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3036d370 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3036d370 Branch: refs/heads/master Commit: 3036d3700aa620bbbffc260e52f633cdaae1172c Parents: 30da051 Author: Damien Profeta <damien.prof...@amadeus.com> Authored: Fri Sep 15 11:01:58 2017 -0700 Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com> Committed: Mon Nov 13 11:45:21 2017 +0200 ---------------------------------------------------------------------- .../exec/store/parquet/ParquetGroupScan.java | 171 ++++++++++--------- .../parquet/TestParquetFilterPushDown.java | 10 ++ .../resources/parquet/multirowgroup.parquet | Bin 0 -> 398 bytes 3 files changed, 103 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/3036d370/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index 4e38ce9..972332c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -738,7 +738,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { private EndpointByteMap byteMap; private int rowGroupIndex; - private String root; + private List<? extends ColumnMetadata> columns; private long rowCount; // rowCount = -1 indicates to include all rows. private long numRecordsToRead; @@ -791,6 +791,14 @@ public class ParquetGroupScan extends AbstractFileGroupScan { return rowCount; } + public List<? extends ColumnMetadata> getColumns() { + return columns; + } + + public void setColumns(List<? extends ColumnMetadata> columns) { + this.columns = columns; + } + } /** @@ -962,69 +970,70 @@ public class ParquetGroupScan extends AbstractFileGroupScan { } } rowGroupInfo.setEndpointByteMap(endpointByteMap); + rowGroupInfo.setColumns(rg.getColumns()); rgIndex++; rowGroupInfos.add(rowGroupInfo); } } this.endpointAffinities = AffinityCreator.getAffinityMap(rowGroupInfos); + updatePartitionColTypeMap(); + } + private void updatePartitionColTypeMap() { columnValueCounts = Maps.newHashMap(); this.rowCount = 0; boolean first = true; - for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) { - for (RowGroupMetadata rowGroup : file.getRowGroups()) { - long rowCount = rowGroup.getRowCount(); - for (ColumnMetadata column : rowGroup.getColumns()) { - SchemaPath schemaPath = SchemaPath.getCompoundPath(column.getName()); - Long previousCount = columnValueCounts.get(schemaPath); - if (previousCount != null) { - if (previousCount != GroupScan.NO_COLUMN_STATS) { - if (column.getNulls() != null) { - Long newCount = rowCount - column.getNulls(); - columnValueCounts.put(schemaPath, columnValueCounts.get(schemaPath) + newCount); - } - } - } else { + for (RowGroupInfo rowGroup : this.rowGroupInfos) { + long rowCount = rowGroup.getRowCount(); + for (ColumnMetadata column : rowGroup.getColumns()) { + SchemaPath schemaPath = SchemaPath.getCompoundPath(column.getName()); + Long previousCount = columnValueCounts.get(schemaPath); + if (previousCount != null) { + if (previousCount != GroupScan.NO_COLUMN_STATS) { if (column.getNulls() != null) { Long newCount = rowCount - column.getNulls(); - columnValueCounts.put(schemaPath, newCount); - } else { - columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS); + columnValueCounts.put(schemaPath, columnValueCounts.get(schemaPath) + newCount); } } - boolean partitionColumn = checkForPartitionColumn(column, first, rowCount); - if (partitionColumn) { - Map<SchemaPath, Object> map = partitionValueMap.get(file.getPath()); - if (map == null) { - map = Maps.newHashMap(); - partitionValueMap.put(file.getPath(), map); + } else { + if (column.getNulls() != null) { + Long newCount = rowCount - column.getNulls(); + columnValueCounts.put(schemaPath, newCount); + } else { + columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS); + } + } + boolean partitionColumn = checkForPartitionColumn(column, first, rowCount); + if (partitionColumn) { + Map<SchemaPath, Object> map = partitionValueMap.get(rowGroup.getPath()); + if (map == null) { + map = Maps.newHashMap(); + partitionValueMap.put(rowGroup.getPath(), map); + } + Object value = map.get(schemaPath); + Object currentValue = column.getMaxValue(); + if (value != null) { + if (value != currentValue) { + partitionColTypeMap.remove(schemaPath); } - Object value = map.get(schemaPath); - Object currentValue = column.getMaxValue(); - if (value != null) { - if (value != currentValue) { - partitionColTypeMap.remove(schemaPath); - } + } else { + // the value of a column with primitive type can not be null, + // so checks that there are really null value and puts it to the map + if (rowCount == column.getNulls()) { + map.put(schemaPath, null); } else { - // the value of a column with primitive type can not be null, - // so checks that there are really null value and puts it to the map - if (rowCount == column.getNulls()) { - map.put(schemaPath, null); - } else { - map.put(schemaPath, currentValue); - } + map.put(schemaPath, currentValue); } - } else { - partitionColTypeMap.remove(schemaPath); } + } else { + partitionColTypeMap.remove(schemaPath); } - this.rowCount += rowGroup.getRowCount(); - first = false; } + this.rowCount += rowGroup.getRowCount(); + first = false; } } - private ParquetTableMetadataBase removeUnneededRowGroups(ParquetTableMetadataBase parquetTableMetadata) { List<ParquetFileMetadata> newFileMetadataList = Lists.newArrayList(); for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) { @@ -1121,6 +1130,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { return "ParquetGroupScan [entries=" + entries + ", selectionRoot=" + selectionRoot + ", numFiles=" + getEntries().size() + + ", numRowGroups=" + rowGroupInfos.size() + ", usedMetadataFile=" + usedMetadataCache + filterStr + cacheFileString @@ -1231,7 +1241,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities, FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) { - if (fileSet.size() == 1 || + if (rowGroupInfos.size() == 1 || ! (parquetTableMetadata.isRowGroupPrunable()) || rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD) ) { @@ -1244,66 +1254,71 @@ public class ParquetGroupScan extends AbstractFileGroupScan { final Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new ParquetRGFilterEvaluator.FieldReferenceFinder(), null); - final List<RowGroupMetadata> qualifiedRGs = new ArrayList<>(parquetTableMetadata.getFiles().size()); + final List<RowGroupInfo> qualifiedRGs = new ArrayList<>(rowGroupInfos.size()); Set<String> qualifiedFileNames = Sets.newHashSet(); // HashSet keeps a fileName unique. ParquetFilterPredicate filterPredicate = null; - for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) { + for (RowGroupInfo rowGroup : rowGroupInfos) { final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, this.columns); - Map<String, String> implicitColValues = columnExplorer.populateImplicitColumns(file.getPath(), selectionRoot); + Map<String, String> implicitColValues = columnExplorer.populateImplicitColumns(rowGroup.getPath(), selectionRoot); - for (RowGroupMetadata rowGroup : file.getRowGroups()) { - ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector( - parquetTableMetadata, - rowGroup.getColumns(), - implicitColValues); + ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector( + parquetTableMetadata, + rowGroup.getColumns(), + implicitColValues); - Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr); + Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr); - if (filterPredicate == null) { - ErrorCollector errorCollector = new ErrorCollectorImpl(); - LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr( - filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry); - - if (errorCollector.hasErrors()) { - logger.error("{} error(s) encountered when materialize filter expression : {}", - errorCollector.getErrorCount(), errorCollector.toErrorString()); - return null; - } - // logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter)); + if (filterPredicate == null) { + ErrorCollector errorCollector = new ErrorCollectorImpl(); + LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr( + filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry); - Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter); - filterPredicate = (ParquetFilterPredicate) ParquetFilterBuilder.buildParquetFilterPredicate( - materializedFilter, constantBoundaries, udfUtilities); - - if (filterPredicate == null) { - return null; - } + if (errorCollector.hasErrors()) { + logger.error("{} error(s) encountered when materialize filter expression : {}", + errorCollector.getErrorCount(), errorCollector.toErrorString()); + return null; } + // logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter)); - if (ParquetRGFilterEvaluator.canDrop(filterPredicate, columnStatisticsMap, rowGroup.getRowCount())) { - continue; + Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter); + filterPredicate = (ParquetFilterPredicate) ParquetFilterBuilder.buildParquetFilterPredicate( + materializedFilter, constantBoundaries, udfUtilities); + + if (filterPredicate == null) { + return null; } + } - qualifiedRGs.add(rowGroup); - qualifiedFileNames.add(file.getPath()); // TODO : optimize when 1 file contains m row groups. + if (ParquetRGFilterEvaluator.canDrop(filterPredicate, columnStatisticsMap, rowGroup.getRowCount())) { + continue; } + + qualifiedRGs.add(rowGroup); + qualifiedFileNames.add(rowGroup.getPath()); // TODO : optimize when 1 file contains m row groups. } - if (qualifiedFileNames.size() == fileSet.size() ) { + + if (qualifiedRGs.size() == rowGroupInfos.size() ) { // There is no reduction of rowGroups. Return the original groupScan. logger.debug("applyFilter does not have any pruning!"); return null; } else if (qualifiedFileNames.size() == 0) { logger.warn("All rowgroups have been filtered out. Add back one to get schema from scannner"); - qualifiedFileNames.add(fileSet.iterator().next()); + RowGroupInfo rg = rowGroupInfos.iterator().next(); + qualifiedFileNames.add(rg.getPath()); + qualifiedRGs.add(rg); } try { FileSelection newSelection = new FileSelection(null, Lists.newArrayList(qualifiedFileNames), getSelectionRoot(), cacheFileRoot, false); - logger.info("applyFilter {} reduce parquet file # from {} to {}", ExpressionStringBuilder.toString(filterExpr), fileSet.size(), qualifiedFileNames.size()); - return this.clone(newSelection); + logger.info("applyFilter {} reduce parquet rowgroup # from {} to {}", ExpressionStringBuilder.toString(filterExpr), rowGroupInfos.size(), qualifiedRGs.size()); + ParquetGroupScan clonegroupscan = this.clone(newSelection); + clonegroupscan.rowGroupInfos = qualifiedRGs; + clonegroupscan.updatePartitionColTypeMap(); + return clonegroupscan; + } catch (IOException e) { logger.warn("Could not apply filter prune due to Exception : {}", e); return null; http://git-wip-us.apache.org/repos/asf/drill/blob/3036d370/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java index fa5c8b2..8f56c45 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java @@ -384,6 +384,16 @@ public class TestParquetFilterPushDown extends PlanTestBase { } + @Test + public void testMultiRowGroup() throws Exception { + // multirowgroup is a parquet file with 2 rowgroups inside. One with a = 1 and the other with a = 2; + // FilterPushDown should be able to remove the rowgroup with a = 1 from the scan operator. + final String sql = String.format("select * from dfs_test.`%s/parquet/multirowgroup.parquet` where a > 1", TEST_RES_PATH); + final String[] expectedPlan = {"numRowGroups=1"}; + final String[] excludedPlan = {}; + PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); + } + ////////////////////////////////////////////////////////////////////////////////////////////////// // Some test helper functions. ////////////////////////////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/drill/blob/3036d370/exec/java-exec/src/test/resources/parquet/multirowgroup.parquet ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/parquet/multirowgroup.parquet b/exec/java-exec/src/test/resources/parquet/multirowgroup.parquet new file mode 100644 index 0000000..1cb5551 Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/multirowgroup.parquet differ