This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new b64c13df chore: Include first ScanExec batch in metrics (#1105)
b64c13df is described below
commit b64c13df9580b7342082167623e0aa03435c2de5
Author: Andy Grove <[email protected]>
AuthorDate: Tue Nov 19 17:41:31 2024 -0700
chore: Include first ScanExec batch in metrics (#1105)
* include first batch in ScanExec metrics
* record row count metric
* fix regression
---
native/core/src/execution/operators/scan.rs | 50 +++++++++++++++++++++++------
1 file changed, 40 insertions(+), 10 deletions(-)
diff --git a/native/core/src/execution/operators/scan.rs
b/native/core/src/execution/operators/scan.rs
index 7d75f7f1..b2546f83 100644
--- a/native/core/src/execution/operators/scan.rs
+++ b/native/core/src/execution/operators/scan.rs
@@ -65,6 +65,8 @@ pub struct ScanExec {
pub input_source_description: String,
/// The data types of columns of the input batch. Converted from Spark
schema.
pub data_types: Vec<DataType>,
+ /// Schema of first batch
+ pub schema: SchemaRef,
/// The input batch of input data. Used to determine the schema of the
input data.
/// It is also used in unit test to mock the input data from JVM.
pub batch: Arc<Mutex<Option<InputBatch>>>,
@@ -72,6 +74,7 @@ pub struct ScanExec {
cache: PlanProperties,
/// Metrics collector
metrics: ExecutionPlanMetricsSet,
+ baseline_metrics: BaselineMetrics,
}
impl ScanExec {
@@ -81,6 +84,9 @@ impl ScanExec {
input_source_description: &str,
data_types: Vec<DataType>,
) -> Result<Self, CometError> {
+ let metrics_set = ExecutionPlanMetricsSet::default();
+ let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
+
// Scan's schema is determined by the input batch, so we need to set
it before execution.
// Note that we determine if arrays are dictionary-encoded based on the
// first batch. The array may be dictionary-encoded in some batches
and not others, and
@@ -88,7 +94,12 @@ impl ScanExec {
// may end up either unpacking dictionary arrays or
dictionary-encoding arrays.
// Dictionary-encoded primitive arrays are always unpacked.
let first_batch = if let Some(input_source) = input_source.as_ref() {
- ScanExec::get_next(exec_context_id, input_source.as_obj(),
data_types.len())?
+ let mut timer = baseline_metrics.elapsed_compute().timer();
+ let batch =
+ ScanExec::get_next(exec_context_id, input_source.as_obj(),
data_types.len())?;
+ timer.stop();
+ baseline_metrics.record_output(batch.num_rows());
+ batch
} else {
InputBatch::EOF
};
@@ -96,7 +107,7 @@ impl ScanExec {
let schema = scan_schema(&first_batch, &data_types);
let cache = PlanProperties::new(
- EquivalenceProperties::new(schema),
+ EquivalenceProperties::new(Arc::clone(&schema)),
// The partitioning is not important because we are not using
DataFusion's
// query planner or optimizer
Partitioning::UnknownPartitioning(1),
@@ -110,7 +121,9 @@ impl ScanExec {
data_types,
batch: Arc::new(Mutex::new(Some(first_batch))),
cache,
- metrics: ExecutionPlanMetricsSet::default(),
+ metrics: metrics_set,
+ baseline_metrics,
+ schema,
})
}
@@ -276,11 +289,15 @@ impl ExecutionPlan for ScanExec {
}
fn schema(&self) -> SchemaRef {
- // `unwrap` is safe because `schema` is only called during converting
- // Spark plan to DataFusion plan. At the moment, `batch` is not EOF.
- let binding = self.batch.try_lock().unwrap();
- let input_batch = binding.as_ref().unwrap();
- scan_schema(input_batch, &self.data_types)
+ if self.exec_context_id == TEST_EXEC_CONTEXT_ID {
+ // `unwrap` is safe because `schema` is only called during
converting
+ // Spark plan to DataFusion plan. At the moment, `batch` is not
EOF.
+ let binding = self.batch.try_lock().unwrap();
+ let input_batch = binding.as_ref().unwrap();
+ scan_schema(input_batch, &self.data_types)
+ } else {
+ Arc::clone(&self.schema)
+ }
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
@@ -303,6 +320,7 @@ impl ExecutionPlan for ScanExec {
self.clone(),
self.schema(),
partition,
+ self.baseline_metrics.clone(),
)))
}
@@ -352,8 +370,12 @@ struct ScanStream<'a> {
}
impl<'a> ScanStream<'a> {
- pub fn new(scan: ScanExec, schema: SchemaRef, partition: usize) -> Self {
- let baseline_metrics = BaselineMetrics::new(&scan.metrics, partition);
+ pub fn new(
+ scan: ScanExec,
+ schema: SchemaRef,
+ partition: usize,
+ baseline_metrics: BaselineMetrics,
+ ) -> Self {
let cast_time =
MetricBuilder::new(&scan.metrics).subset_time("cast_time", partition);
Self {
scan,
@@ -465,4 +487,12 @@ impl InputBatch {
InputBatch::Batch(columns, num_rows)
}
+
+ /// Get the number of rows in this batch
+ fn num_rows(&self) -> usize {
+ match self {
+ Self::EOF => 0,
+ Self::Batch(_, num_rows) => *num_rows,
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]