[ 
https://issues.apache.org/jira/browse/DRILL-7062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16819611#comment-16819611
 ] 

ASF GitHub Bot commented on DRILL-7062:
---------------------------------------

rhou1 commented on pull request #1738: DRILL-7062: Initial implementation of 
run-time row-group pruning
URL: https://github.com/apache/drill/pull/1738#discussion_r276035958
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
 ##########
 @@ -68,76 +84,144 @@ protected ScanBatch getBatch(ExecutorFragmentContext 
context, AbstractParquetRow
     List<RecordReader> readers = new LinkedList<>();
     List<Map<String, String>> implicitColumns = new ArrayList<>();
     Map<String, String> mapWithMaxColumns = new LinkedHashMap<>();
-    for (RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) {
-      /*
-      Here we could store a map from file names to footers, to prevent 
re-reading the footer for each row group in a file
-      TODO - to prevent reading the footer again in the parquet record reader 
(it is read earlier in the ParquetStorageEngine)
-      we should add more information to the RowGroupInfo that will be 
populated upon the first read to
-      provide the reader with all of th file meta-data it needs
-      These fields will be added to the constructor below
-      */
-      try {
-        Stopwatch timer = logger.isTraceEnabled() ? 
Stopwatch.createUnstarted() : null;
-        DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(rowGroup), 
rowGroup.getPath());
-        ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
-        if (!footers.containsKey(rowGroup.getPath())) {
-          if (timer != null) {
-            timer.start();
-          }
+    ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
+    RowGroupReadEntry firstRowGroup = null; // to be scanned in case ALL row 
groups are pruned out
+    ParquetMetadata firstFooter = null;
+    long rowgroupsPruned = 0; // for stats
+    TupleSchema tupleSchema = rowGroupScan.getTupleSchema();
+
+    try {
+
+      LogicalExpression filterExpr = rowGroupScan.getFilter();
+      boolean doRuntimePruning = filterExpr != null && // was a filter given ? 
  And it is not just a "TRUE" predicate
+        ! ((filterExpr instanceof ValueExpressions.BooleanExpression) && 
((ValueExpressions.BooleanExpression) filterExpr).getBoolean() );
 
-          ParquetMetadata footer = readFooter(fs.getConf(), 
rowGroup.getPath(), readerConfig);
-          if (timer != null) {
-            long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
-            logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", 
rowGroup.getPath(), "", 0, 0, 0, timeToRead);
+      // Runtime pruning: Avoid recomputing metadata objects for each 
row-group in case they use the same file
+      // by keeping the following objects computed earlier (relies on same 
file being in consecutive rowgroups)
+      Path prevRowGroupPath = null;
+      Metadata_V4.ParquetTableMetadata_v4 tableMetadataV4 = null;
+      Metadata_V4.ParquetFileAndRowCountMetadata fileMetadataV4 = null;
+      FileSelection fileSelection = null;
+      FilterPredicate filterPredicate = null;
+      Set<SchemaPath> schemaPathsInExpr = null;
+      Set<String> columnsInExpr = null;
+
+      // If pruning - Prepare the predicate and the columns before the FOR LOOP
+      if ( doRuntimePruning ) {
+        filterPredicate = 
AbstractGroupScanWithMetadata.getFilterPredicate(filterExpr, context,
+          (FunctionImplementationRegistry) context.getFunctionRegistry(), 
context.getOptions(), true,
+          true /* supports file implicit columns */,
+          tupleSchema);
+        // Extract only the relevant columns from the filter (sans implicit 
columns, if any)
+        schemaPathsInExpr = filterExpr.accept(new 
FilterEvaluatorUtils.FieldReferenceFinder(), null);
+        columnsInExpr = new HashSet<>();
+        String partitionColumnLabel = 
context.getOptions().getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
+        for (SchemaPath path : schemaPathsInExpr) {
+          if (rowGroupScan.supportsFileImplicitColumns() &&
+            path.toString().matches(partitionColumnLabel+"\\d+")) {
+            continue;  // skip implicit columns like dir0, dir1
           }
-          footers.put(rowGroup.getPath(), footer);
-        }
-        ParquetMetadata footer = footers.get(rowGroup.getPath());
-
-        ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = 
ParquetReaderUtility.detectCorruptDates(footer,
-          rowGroupScan.getColumns(), readerConfig.autoCorrectCorruptedDates());
-        logger.debug("Contains corrupt dates: {}.", containsCorruptDates);
-
-        boolean useNewReader = 
context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER);
-        boolean containsComplexColumn = 
ParquetReaderUtility.containsComplexColumn(footer, rowGroupScan.getColumns());
-        logger.debug("PARQUET_NEW_RECORD_READER is {}. Complex columns {}.", 
useNewReader ? "enabled" : "disabled",
-            containsComplexColumn ? "found." : "not found.");
-        RecordReader reader;
-
-        if (useNewReader || containsComplexColumn) {
-          reader = new DrillParquetReader(context,
-              footer,
-              rowGroup,
-              columnExplorer.getTableColumns(),
-              fs,
-              containsCorruptDates);
-        } else {
-          reader = new ParquetRecordReader(context,
-              rowGroup.getPath(),
-              rowGroup.getRowGroupIndex(),
-              rowGroup.getNumRecordsToRead(),
-              fs,
-              CodecFactory.createDirectCodecFactory(fs.getConf(), new 
ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
-              footer,
-              rowGroupScan.getColumns(),
-              containsCorruptDates);
+          columnsInExpr.add(path.getRootSegmentPath());
         }
+        doRuntimePruning = ! columnsInExpr.isEmpty(); // just in case: if no 
columns - cancel pruning
+      }
 
-        logger.debug("Query {} uses {}",
-            
QueryIdHelper.getQueryId(oContext.getFragmentContext().getHandle().getQueryId()),
-            reader.getClass().getSimpleName());
-        readers.add(reader);
+      for (RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) 
{
+        /*
+        Here we could store a map from file names to footers, to prevent 
re-reading the footer for each row group in a file
+        TODO - to prevent reading the footer again in the parquet record 
reader (it is read earlier in the ParquetStorageEngine)
+        we should add more information to the RowGroupInfo that will be 
populated upon the first read to
+        provide the reader with all of th file meta-data it needs
+        These fields will be added to the constructor below
+        */
 
-        List<String> partitionValues = 
rowGroupScan.getPartitionValues(rowGroup);
-        Map<String, String> implicitValues = 
columnExplorer.populateImplicitColumns(rowGroup.getPath(), partitionValues, 
rowGroupScan.supportsFileImplicitColumns());
-        implicitColumns.add(implicitValues);
-        if (implicitValues.size() > mapWithMaxColumns.size()) {
-          mapWithMaxColumns = implicitValues;
-        }
+          Stopwatch timer = logger.isTraceEnabled() ? 
Stopwatch.createUnstarted() : null;
+          DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(rowGroup), 
rowGroup.getPath());
+          if (!footers.containsKey(rowGroup.getPath())) {
+            if (timer != null) {
+              timer.start();
+            }
+
+            ParquetMetadata footer = readFooter(fs.getConf(), 
rowGroup.getPath(), readerConfig);
+            if (timer != null) {
+              long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+              logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", 
"", rowGroup.getPath(), "", 0, 0, 0, timeToRead);
+            }
+            footers.put(rowGroup.getPath(), footer);
+          }
+          ParquetMetadata footer = footers.get(rowGroup.getPath());
+
+          //
+          //   If a filter is given (and it is not just "TRUE") - then use it 
to perform run-time pruning
+          //
+          if ( doRuntimePruning  ) { // skip when no filter or filter is TRUE
+
+            int rowGroupIndex = rowGroup.getRowGroupIndex();
+            long footerRowCount = 
footer.getBlocks().get(rowGroupIndex).getRowCount();
+
+            if ( timer != null ) {  // restart the timer, if tracing
+              timer.reset();
+              timer.start();
+            }
+
+            // When starting a new file, or at the first time - Initialize the 
path specific metadata
+            if ( ! rowGroup.getPath().equals(prevRowGroupPath) ) {
+              // Create a table metadata (V4)
+              tableMetadataV4 = new Metadata_V4.ParquetTableMetadata_v4();
+
+              // The file status for this file
+              FileStatus fileStatus = fs.getFileStatus(rowGroup.getPath());
+
+              // The file metadata (only for the columns used in the filter)
+              fileMetadataV4 = 
Metadata.getParquetFileMetadata_v4(tableMetadataV4, footer, fileStatus, fs, 
false, true, columnsInExpr, readerConfig);
+
+              prevRowGroupPath = rowGroup.getPath(); // for next time
+            }
+
+            MetadataBase.RowGroupMetadata rowGroupMetadata = 
fileMetadataV4.getFileMetadata().getRowGroups().get(rowGroup.getRowGroupIndex());
+
+            Map<SchemaPath, ColumnStatistics> columnsStatistics = 
ParquetTableMetadataUtils.getRowGroupColumnStatistics(tableMetadataV4, 
rowGroupMetadata);
 
-      } catch (IOException e) {
-        throw new ExecutionSetupException(e);
+            //
+            // Perform the Run-Time Pruning - i.e. Skip this rowgroup if the 
match fails
+            //
+            RowsMatch match = FilterEvaluatorUtils.matches(filterPredicate, 
columnsStatistics, footerRowCount);
+            if (timer != null) { // if tracing
+              long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+              logger.trace("Run-time pruning: {} row-group {} (RG index: {} 
row count: {}), took {} usec", match == RowsMatch.NONE ? "Excluded" : 
"Included", rowGroup.getPath(),
 
 Review comment:
   Please also add a keyword such as PARQUET_RUNTIME_PRUNING so that the QA 
test framework can find this log message and verify it.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Run-time row group pruning
> --------------------------
>
>                 Key: DRILL-7062
>                 URL: https://issues.apache.org/jira/browse/DRILL-7062
>             Project: Apache Drill
>          Issue Type: Sub-task
>          Components: Metadata
>            Reporter: Venkata Jyothsna Donapati
>            Assignee: Boaz Ben-Zvi
>            Priority: Major
>             Fix For: 1.17.0
>
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to