[ https://issues.apache.org/jira/browse/DRILL-6331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16449475#comment-16449475 ]
ASF GitHub Bot commented on DRILL-6331: --------------------------------------- Github user vdiravka commented on a diff in the pull request: https://github.com/apache/drill/pull/1214#discussion_r183551693 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import org.apache.drill.common.expression.ErrorCollector; +import org.apache.drill.common.expression.ErrorCollectorImpl; +import org.apache.drill.common.expression.ExpressionStringBuilder; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier; +import org.apache.drill.exec.expr.ExpressionTreeMaterializer; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.expr.stat.ParquetFilterPredicate; +import org.apache.drill.exec.ops.UdfUtilities; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.base.AbstractFileGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.ColumnExplorer; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.ReadEntryWithPath; +import org.apache.drill.exec.store.parquet.stat.ColumnStatistics; +import org.apache.drill.exec.store.parquet.stat.ParquetMetaStatCollector; +import org.apache.drill.exec.store.schedule.AffinityCreator; +import org.apache.drill.exec.store.schedule.AssignmentCreator; +import org.apache.drill.exec.store.schedule.EndpointByteMap; +import org.apache.drill.exec.store.schedule.EndpointByteMapImpl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + + +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata; +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata; +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase; + +public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetGroupScan.class); + + protected List<SchemaPath> columns; + protected List<ReadEntryWithPath> entries; + protected LogicalExpression filter; + + protected ParquetTableMetadataBase parquetTableMetadata; + protected List<RowGroupInfo> rowGroupInfos; + protected ListMultimap<Integer, RowGroupInfo> mappings; + protected Set<String> fileSet; + + private List<EndpointAffinity> endpointAffinities; + private ParquetGroupScanStatistics parquetGroupScanStatistics; + + protected AbstractParquetGroupScan(String userName, List<SchemaPath> columns, List<ReadEntryWithPath> entries, LogicalExpression filter) { + super(userName); + this.columns = columns; + this.entries = entries; + this.filter = filter; + } + + // immutable copy constructor + protected AbstractParquetGroupScan(AbstractParquetGroupScan that) { + super(that); + this.columns = that.columns == null ? null : new ArrayList<>(that.columns); + this.parquetTableMetadata = that.parquetTableMetadata; + this.rowGroupInfos = that.rowGroupInfos == null ? null : new ArrayList<>(that.rowGroupInfos); + this.filter = that.filter; + this.endpointAffinities = that.endpointAffinities == null ? null : new ArrayList<>(that.endpointAffinities); + this.mappings = that.mappings == null ? null : ArrayListMultimap.create(that.mappings); + this.parquetGroupScanStatistics = that.parquetGroupScanStatistics == null ? null : new ParquetGroupScanStatistics(that.parquetGroupScanStatistics); + this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet); + this.entries = that.entries == null ? null : new ArrayList<>(that.entries); + } + + @JsonProperty + public List<SchemaPath> getColumns() { + return columns; + } + + @JsonProperty + public List<ReadEntryWithPath> getEntries() { + return entries; + } + + @JsonIgnore + @Override + public Collection<String> getFiles() { + return fileSet; + } + + @Override + public boolean hasFiles() { + return true; + } + + @Override + public boolean canPushdownProjects(List<SchemaPath> columns) { + return true; + } + + /** + * Return column value count for the specified column. + * If does not contain such column, return 0. + * Is used when applying convert to direct scan rule. + * + * @param column column schema path + * @return column value count + */ + @Override + public long getColumnValueCount(SchemaPath column) { + return parquetGroupScanStatistics.getColumnValueCount(column); + } + + /** + * Calculates the affinity each endpoint has for this scan, + * by adding up the affinity each endpoint has for each rowGroup. + * + * @return a list of EndpointAffinity objects + */ + @Override + public List<EndpointAffinity> getOperatorAffinity() { + return endpointAffinities; + } + + @Override + public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> incomingEndpoints) { + this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos); + } + + @Override + public int getMaxParallelizationWidth() { + return rowGroupInfos.size(); + } + + @Override + public String getDigest() { + return toString(); + } + + @Override + public ScanStats getScanStats() { + int columnCount = columns == null ? 20 : columns.size(); + long rowCount = parquetGroupScanStatistics.getRowCount(); + ScanStats scanStats = new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, rowCount, 1, rowCount * columnCount); + logger.trace("Drill parquet scan statistics: {}", scanStats); + return scanStats; + } + + protected List<RowGroupReadEntry> getReadEntries(int minorFragmentId) { + assert minorFragmentId < mappings.size() : String + .format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", + mappings.size(), minorFragmentId); + + List<RowGroupInfo> rowGroupsForMinor = mappings.get(minorFragmentId); + + Preconditions.checkArgument(!rowGroupsForMinor.isEmpty(), + String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId)); + + List<RowGroupReadEntry> entries = new ArrayList<>(); + for (RowGroupInfo rgi : rowGroupsForMinor) { + RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex(), rgi.getNumRecordsToRead()); + entries.add(entry); + } + return entries; + } + + // filter push down methods block start + @JsonProperty + @Override + public LogicalExpression getFilter() { + return filter; + } + + public void setFilter(LogicalExpression filter) { + this.filter = filter; + } + + @Override + public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities, + FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) { + + if (rowGroupInfos.size() == 1 || + ! (parquetTableMetadata.isRowGroupPrunable()) || + rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD) + ) { + // Stop pruning for 3 cases: + // - 1 single parquet file, + // - metadata does not have proper format to support row group level filter pruning, + // - # of row groups is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD. + return null; + } + + final Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new ParquetRGFilterEvaluator.FieldReferenceFinder(), null); + + final List<RowGroupInfo> qualifiedRGs = new ArrayList<>(rowGroupInfos.size()); + Set<String> qualifiedFilePath = new HashSet<>(); // HashSet keeps a fileName unique. + + ParquetFilterPredicate filterPredicate = null; + + for (RowGroupInfo rowGroup : rowGroupInfos) { + final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns); + List<String> partitionValues = getPartitionValues(rowGroup); + Map<String, String> implicitColValues = columnExplorer.populateImplicitColumns(rowGroup.getPath(), partitionValues, supportsFileImplicitColumns()); + + ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector( + parquetTableMetadata, + rowGroup.getColumns(), + implicitColValues); + + Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr); + + if (filterPredicate == null) { + ErrorCollector errorCollector = new ErrorCollectorImpl(); + LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr( + filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry); + + if (errorCollector.hasErrors()) { + logger.error("{} error(s) encountered when materialize filter expression : {}", + errorCollector.getErrorCount(), errorCollector.toErrorString()); + return null; + } + // logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter)); --- End diff -- Looks like it is useful logger. Uncomment? > Parquet filter pushdown does not support the native hive reader > --------------------------------------------------------------- > > Key: DRILL-6331 > URL: https://issues.apache.org/jira/browse/DRILL-6331 > Project: Apache Drill > Issue Type: Improvement > Components: Storage - Hive > Affects Versions: 1.13.0 > Reporter: Arina Ielchiieva > Assignee: Arina Ielchiieva > Priority: Major > Fix For: 1.14.0 > > > Initially HiveDrillNativeParquetGroupScan was based mainly on HiveScan, the > core difference between them was > that HiveDrillNativeParquetScanBatchCreator was creating ParquetRecordReader > instead of HiveReader. > This allowed to read Hive parquet files using Drill native parquet reader but > did not expose Hive data to Drill optimizations. > For example, filter push down, limit push down, count to direct scan > optimizations. > Hive code had to be refactored to use the same interfaces as > ParquestGroupScan in order to be exposed to such optimizations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)