This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 630c09449 Add metrics for ParquetExec (#2499)
630c09449 is described below
commit 630c094496d754669bd1f1da11d9ace6e6ae0b6f
Author: Yang Jiang <[email protected]>
AuthorDate: Wed May 11 20:38:07 2022 +0800
Add metrics for ParquetExec (#2499)
* Add metrics for ParquetExec
* fix row_count
---
datafusion/core/src/physical_plan/file_format/parquet.rs | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index d2e156f32..8ed1bcee5 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -49,6 +49,7 @@ use datafusion_common::Column;
use datafusion_data_access::object_store::ObjectStore;
use datafusion_expr::Expr;
+use crate::physical_plan::metrics::BaselineMetrics;
use crate::physical_plan::stream::RecordBatchReceiverStream;
use crate::{
datasource::{file_format::parquet::ChunkObjectReader,
listing::PartitionedFile},
@@ -227,6 +228,7 @@ impl ExecutionPlan for ParquetExec {
files:
self.base_config.file_groups[partition_index].clone().into(),
projector: partition_col_proj,
adapter: SchemaAdapter::new(self.base_config.file_schema.clone()),
+ baseline_metrics: BaselineMetrics::new(&self.metrics,
partition_index),
};
// Use spawn_blocking only if running from a tokio context (#2201)
@@ -308,6 +310,7 @@ struct ParquetExecStream {
files: VecDeque<PartitionedFile>,
projector: PartitionColumnProjector,
adapter: SchemaAdapter,
+ baseline_metrics: BaselineMetrics,
}
impl ParquetExecStream {
@@ -366,6 +369,10 @@ impl Iterator for ParquetExecStream {
type Item = ArrowResult<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
+ let cloned_time = self.baseline_metrics.elapsed_compute().clone();
+ // records time on drop
+ let _timer = cloned_time.timer();
+
if self.error || matches!(self.remaining_rows, Some(0)) {
return None;
}
@@ -413,6 +420,11 @@ impl Iterator for ParquetExecStream {
_ => self.error = result.is_err(),
}
+ //record output rows in parquetExec
+ if let Ok(batch) = &result {
+ self.baseline_metrics.record_output(batch.num_rows());
+ }
+
return Some(result);
}
}
@@ -425,7 +437,8 @@ impl Stream for ParquetExecStream {
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- Poll::Ready(Iterator::next(&mut *self))
+ let poll = Poll::Ready(Iterator::next(&mut *self));
+ self.baseline_metrics.record_poll(poll)
}
}