This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 630c09449 Add metrics for ParquetExec (#2499)
630c09449 is described below

commit 630c094496d754669bd1f1da11d9ace6e6ae0b6f
Author: Yang Jiang <[email protected]>
AuthorDate: Wed May 11 20:38:07 2022 +0800

    Add metrics for ParquetExec (#2499)
    
    * Add metrics for ParquetExec
    
    * fix row_count
---
 datafusion/core/src/physical_plan/file_format/parquet.rs | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index d2e156f32..8ed1bcee5 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -49,6 +49,7 @@ use datafusion_common::Column;
 use datafusion_data_access::object_store::ObjectStore;
 use datafusion_expr::Expr;
 
+use crate::physical_plan::metrics::BaselineMetrics;
 use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::{
     datasource::{file_format::parquet::ChunkObjectReader, 
listing::PartitionedFile},
@@ -227,6 +228,7 @@ impl ExecutionPlan for ParquetExec {
             files: 
self.base_config.file_groups[partition_index].clone().into(),
             projector: partition_col_proj,
             adapter: SchemaAdapter::new(self.base_config.file_schema.clone()),
+            baseline_metrics: BaselineMetrics::new(&self.metrics, 
partition_index),
         };
 
         // Use spawn_blocking only if running from a tokio context (#2201)
@@ -308,6 +310,7 @@ struct ParquetExecStream {
     files: VecDeque<PartitionedFile>,
     projector: PartitionColumnProjector,
     adapter: SchemaAdapter,
+    baseline_metrics: BaselineMetrics,
 }
 
 impl ParquetExecStream {
@@ -366,6 +369,10 @@ impl Iterator for ParquetExecStream {
     type Item = ArrowResult<RecordBatch>;
 
     fn next(&mut self) -> Option<Self::Item> {
+        let cloned_time = self.baseline_metrics.elapsed_compute().clone();
+        // records time on drop
+        let _timer = cloned_time.timer();
+
         if self.error || matches!(self.remaining_rows, Some(0)) {
             return None;
         }
@@ -413,6 +420,11 @@ impl Iterator for ParquetExecStream {
                 _ => self.error = result.is_err(),
             }
 
+            //record output rows in parquetExec
+            if let Ok(batch) = &result {
+                self.baseline_metrics.record_output(batch.num_rows());
+            }
+
             return Some(result);
         }
     }
@@ -425,7 +437,8 @@ impl Stream for ParquetExecStream {
         mut self: Pin<&mut Self>,
         _cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        Poll::Ready(Iterator::next(&mut *self))
+        let poll = Poll::Ready(Iterator::next(&mut *self));
+        self.baseline_metrics.record_poll(poll)
     }
 }
 

Reply via email to