[ https://issues.apache.org/jira/browse/DRILL-7062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812142#comment-16812142 ]
ASF GitHub Bot commented on DRILL-7062: --------------------------------------- amansinha100 commented on pull request #1738: DRILL-7062: Initial implementation of run-time row-group pruning URL: https://github.com/apache/drill/pull/1738#discussion_r272898010 ########## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java ########## @@ -68,76 +83,131 @@ protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRow List<RecordReader> readers = new LinkedList<>(); List<Map<String, String>> implicitColumns = new ArrayList<>(); Map<String, String> mapWithMaxColumns = new LinkedHashMap<>(); - for (RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) { - /* - Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file - TODO - to prevent reading the footer again in the parquet record reader (it is read earlier in the ParquetStorageEngine) - we should add more information to the RowGroupInfo that will be populated upon the first read to - provide the reader with all of th file meta-data it needs - These fields will be added to the constructor below - */ - try { - Stopwatch timer = logger.isTraceEnabled() ? Stopwatch.createUnstarted() : null; - DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(rowGroup), rowGroup.getPath()); - ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig(); - if (!footers.containsKey(rowGroup.getPath())) { - if (timer != null) { - timer.start(); + ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig(); + RowGroupReadEntry firstRowGroup = null; // to be scanned in case ALL row groups are pruned out + ParquetMetadata firstFooter = null; + long rowgroupsPruned = 0; // for stats + + try { + + LogicalExpression filterExpr = rowGroupScan.getFilter(); + Path selectionRoot = rowGroupScan.getSelectionRoot(); + // Runtime pruning: Avoid recomputing metadata objects for each row-group in case they use the same file + // by keeping the following objects computed earlier (relies on same file being in consecutive rowgroups) + Path prevRowGroupPath = null; + Metadata_V3.ParquetTableMetadata_v3 tableMetadataV3 = null; + Metadata_V3.ParquetFileMetadata_v3 fileMetadataV3 = null; + FileSelection fileSelection = null; + ParquetTableMetadataProviderImpl metadataProvider = null; + + for (RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) { + /* + Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file + TODO - to prevent reading the footer again in the parquet record reader (it is read earlier in the ParquetStorageEngine) + we should add more information to the RowGroupInfo that will be populated upon the first read to + provide the reader with all of th file meta-data it needs + These fields will be added to the constructor below + */ + + Stopwatch timer = logger.isTraceEnabled() ? Stopwatch.createUnstarted() : null; + DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(rowGroup), rowGroup.getPath()); + if (!footers.containsKey(rowGroup.getPath())) { + if (timer != null) { + timer.start(); + } + + ParquetMetadata footer = readFooter(fs.getConf(), rowGroup.getPath(), readerConfig); + if (timer != null) { + long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS); + logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", rowGroup.getPath(), "", 0, 0, 0, timeToRead); + } + footers.put(rowGroup.getPath(), footer); } + ParquetMetadata footer = footers.get(rowGroup.getPath()); + + // + // If a filter is given (and it is not just "TRUE") - then use it to perform run-time pruning + // + if ( filterExpr != null && ! (filterExpr instanceof ValueExpressions.BooleanExpression) ) { // skip when no filter or filter is TRUE + + int rowGroupIndex = rowGroup.getRowGroupIndex(); + long footerRowCount = footer.getBlocks().get(rowGroupIndex).getRowCount(); + + if ( timer != null ) { // restart the timer, if tracing + timer.reset(); + timer.start(); + } + + // When starting a new file, or at the first time - Initialize path specific metadata etc + if ( ! rowGroup.getPath().equals(prevRowGroupPath) ) { + // Get the table metadata (V3) + tableMetadataV3 = Metadata.getParquetTableMetadata(footer, fs, rowGroup.getPath().toString(), readerConfig); + + // The file status for this file + FileStatus fileStatus = fs.getFileStatus(rowGroup.getPath()); + List<FileStatus> listFileStatus = new ArrayList<>(Arrays.asList(fileStatus)); + List<Path> listRowGroupPath = new ArrayList<>(Arrays.asList(rowGroup.getPath())); + List<ReadEntryWithPath> entries = new ArrayList<>(Arrays.asList(new ReadEntryWithPath(rowGroup.getPath()))); + fileSelection = new FileSelection(listFileStatus, listRowGroupPath, selectionRoot); + + metadataProvider = new ParquetTableMetadataProviderImpl(entries, selectionRoot, fileSelection.cacheFileRoot, readerConfig, fs,false); + // The file metadata (for all columns) + fileMetadataV3 = Metadata.getParquetFileMetadata_v3(tableMetadataV3, footer, fileStatus, fs, true, null, readerConfig); + + prevRowGroupPath = rowGroup.getPath(); // for next time + } + + MetadataBase.RowGroupMetadata rowGroupMetadata = fileMetadataV3.getRowGroups().get(rowGroup.getRowGroupIndex()); + + Map<SchemaPath, ColumnStatistics> columnsStatistics = ParquetTableMetadataUtils.getRowGroupColumnStatistics(tableMetadataV3, rowGroupMetadata); + + List<SchemaPath> columns = columnsStatistics.keySet().stream().collect(Collectors.toList()); + + ParquetGroupScan parquetGroupScan = new ParquetGroupScan( context.getQueryUserName(), metadataProvider, fileSelection, columns, readerConfig, filterExpr); Review comment: I am actually less concerned about the elapsed time and more about the number of objects created on the Java heap. The `ParquetGroupScan` is at least 100 bytes each and since we are targeting use cases with about million row groups per minor fragment, it means 100 MB per minor fragment (assuming the garbage collector is not doing the cleanup while the Foo loop is executing). If there are 24 minor frags, this would add up to 2.4GB heap per node which would cause quite a bit of GC activity subsequently. I have made some changes on top of your branch which avoids this by directly calling a static method to create a FilterPredicate. The pruning unit tests are passing with these changes. I will send my changes to you offline and we can discuss. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Run-time row group pruning > -------------------------- > > Key: DRILL-7062 > URL: https://issues.apache.org/jira/browse/DRILL-7062 > Project: Apache Drill > Issue Type: Sub-task > Components: Metadata > Reporter: Venkata Jyothsna Donapati > Assignee: Boaz Ben-Zvi > Priority: Major > Fix For: 1.16.0 > > Original Estimate: 504h > Remaining Estimate: 504h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)