This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 32f281a2 feat: capture more metrics in distributed_query (#1353)
32f281a2 is described below
commit 32f281a2a7cada087c7eebe6d3a4cbd355d45af0
Author: Hoang Pham <[email protected]>
AuthorDate: Fri Jan 2 19:12:10 2026 +0700
feat: capture more metrics in distributed_query (#1353)
* add more metrics
* use queue_at
---------
Signed-off-by: Hoang Pham <[email protected]>
---
ballista/client/tests/context_checks.rs | 39 ++++++++++++++++++
.../core/src/execution_plans/distributed_query.rs | 48 ++++++++++++++++++++--
2 files changed, 83 insertions(+), 4 deletions(-)
diff --git a/ballista/client/tests/context_checks.rs
b/ballista/client/tests/context_checks.rs
index deba3bc3..7bc1d00b 100644
--- a/ballista/client/tests/context_checks.rs
+++ b/ballista/client/tests/context_checks.rs
@@ -123,6 +123,31 @@ mod supported {
> 0
);
+ // Verify timing metrics
+ let job_execution_time =
metrics.sum_by_name("job_execution_time_ms").unwrap();
+ assert!(
+ job_execution_time.as_usize() > 0,
+ "job_execution_time_ms should be greater than 0"
+ );
+
+ let scheduling_time =
metrics.sum_by_name("job_scheduling_in_ms").unwrap();
+ assert!(
+ scheduling_time.as_usize() > 0,
+ "job_scheduling_in_ms should be non-negative"
+ );
+
+ let total_time = metrics.sum_by_name("total_query_time_ms").unwrap();
+ assert!(
+ total_time.as_usize() > 0,
+ "total_query_time_ms should be greater than 0"
+ );
+
+ // Total time should be at least as long as execution time
+ assert!(
+ total_time.as_usize() >= job_execution_time.as_usize(),
+ "total_query_time_ms should be >= job_execution_time_ms"
+ );
+
Ok(())
}
@@ -182,6 +207,20 @@ mod supported {
assert_batches_eq!(expected, &result);
+ // Verify timing metrics
+ let metrics = plan.metrics().unwrap();
+ let job_execution_time =
metrics.sum_by_name("job_execution_time_ms").unwrap();
+ assert!(
+ job_execution_time.as_usize() > 0,
+ "job_execution_time_ms should be greater than 0"
+ );
+
+ let total_time = metrics.sum_by_name("total_query_time_ms").unwrap();
+ assert!(
+ total_time.as_usize() >= job_execution_time.as_usize(),
+ "total_query_time_ms should be >= job_execution_time_ms"
+ );
+
Ok(())
}
diff --git a/ballista/core/src/execution_plans/distributed_query.rs
b/ballista/core/src/execution_plans/distributed_query.rs
index 8a084f96..96981b92 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -71,8 +71,12 @@ pub struct DistributedQueryExec<T: 'static + AsLogicalPlan> {
/// Plan properties
properties: PlanProperties,
/// Execution metrics, currently exposes:
- /// - row count
- /// - transferred_bytes
+ /// - output_rows: Total number of rows returned
+ /// - transferred_bytes: Total bytes transferred from executors
+ /// - job_execution_time_ms: Time spent executing on the cluster
(server-side)
+ /// - job_scheduling_in_ms: Time from query submission to job start
(includes queue time)
+ /// - job_execution_time_ms: Time spent executing on the cluster (ended_at
- started_at)
+ /// - job_scheduling_in_ms: Time job waited in scheduler queue (started_at
- queued_at)
metrics: ExecutionPlanMetricsSet,
}
@@ -234,6 +238,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for
DistributedQueryExec<T> {
let metric_row_count =
MetricBuilder::new(&self.metrics).output_rows(partition);
let metric_total_bytes =
MetricBuilder::new(&self.metrics).counter("transferred_bytes",
partition);
+
let stream = futures::stream::once(
execute_query(
self.scheduler_url.clone(),
@@ -241,6 +246,8 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for
DistributedQueryExec<T> {
query,
self.config.default_grpc_client_max_message_size(),
GrpcClientConfig::from(&self.config),
+ Arc::new(self.metrics.clone()),
+ partition,
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
@@ -278,7 +285,12 @@ async fn execute_query(
query: ExecuteQueryParams,
max_message_size: usize,
grpc_config: GrpcClientConfig,
+ metrics: Arc<ExecutionPlanMetricsSet>,
+ partition: usize,
) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
+ // Capture query submission time for total_query_time_ms
+ let query_start_time = std::time::Instant::now();
+
info!("Connecting to Ballista scheduler at {scheduler_url}");
// TODO reuse the scheduler to avoid connecting to the Ballista scheduler
again and again
let connection = create_grpc_client_connection(scheduler_url, &grpc_config)
@@ -351,15 +363,43 @@ async fn execute_query(
break Err(DataFusionError::Execution(msg));
}
Some(job_status::Status::Successful(SuccessfulJob {
+ queued_at,
started_at,
ended_at,
partition_location,
..
})) => {
- let duration = ended_at.saturating_sub(started_at);
- let duration = Duration::from_millis(duration);
+ // Calculate job execution time (server-side execution)
+ let job_execution_ms = ended_at.saturating_sub(started_at);
+ let duration = Duration::from_millis(job_execution_ms);
info!("Job {job_id} finished executing in {duration:?} ");
+
+ // Calculate scheduling time (server-side queue time)
+ // This includes network latency and actual queue time
+ let scheduling_ms = started_at.saturating_sub(queued_at);
+
+ // Calculate total query time (end-to-end from client
perspective)
+ let total_elapsed = query_start_time.elapsed();
+ let total_ms = total_elapsed.as_millis();
+
+ // Set timing metrics
+ let metric_job_execution = MetricBuilder::new(&metrics)
+ .gauge("job_execution_time_ms", partition);
+ metric_job_execution.set(job_execution_ms as usize);
+
+ let metric_scheduling =
+ MetricBuilder::new(&metrics).gauge("job_scheduling_in_ms",
partition);
+ metric_scheduling.set(scheduling_ms as usize);
+
+ let metric_total_time =
+ MetricBuilder::new(&metrics).gauge("total_query_time_ms",
partition);
+ metric_total_time.set(total_ms as usize);
+
+ // Note: data_transfer_time_ms is not set here because
partition fetching
+ // happens lazily when the stream is consumed, not during
execute_query.
+ // This could be added in a future enhancement by wrapping the
stream.
+
let streams = partition_location.into_iter().map(move
|partition| {
let f = fetch_partition(partition, max_message_size, true)
.map_err(|e| ArrowError::ExternalError(Box::new(e)));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]