This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 32f281a2 feat: capture more metrics in distributed_query (#1353)
32f281a2 is described below

commit 32f281a2a7cada087c7eebe6d3a4cbd355d45af0
Author: Hoang Pham <[email protected]>
AuthorDate: Fri Jan 2 19:12:10 2026 +0700

    feat: capture more metrics in distributed_query (#1353)
    
    * add more metrics
    * use queue_at
    
    
    
    ---------
    
    Signed-off-by: Hoang Pham <[email protected]>
---
 ballista/client/tests/context_checks.rs            | 39 ++++++++++++++++++
 .../core/src/execution_plans/distributed_query.rs  | 48 ++++++++++++++++++++--
 2 files changed, 83 insertions(+), 4 deletions(-)

diff --git a/ballista/client/tests/context_checks.rs 
b/ballista/client/tests/context_checks.rs
index deba3bc3..7bc1d00b 100644
--- a/ballista/client/tests/context_checks.rs
+++ b/ballista/client/tests/context_checks.rs
@@ -123,6 +123,31 @@ mod supported {
                 > 0
         );
 
+        // Verify timing metrics
+        let job_execution_time = 
metrics.sum_by_name("job_execution_time_ms").unwrap();
+        assert!(
+            job_execution_time.as_usize() > 0,
+            "job_execution_time_ms should be greater than 0"
+        );
+
+        let scheduling_time = 
metrics.sum_by_name("job_scheduling_in_ms").unwrap();
+        assert!(
+            scheduling_time.as_usize() > 0,
+            "job_scheduling_in_ms should be non-negative"
+        );
+
+        let total_time = metrics.sum_by_name("total_query_time_ms").unwrap();
+        assert!(
+            total_time.as_usize() > 0,
+            "total_query_time_ms should be greater than 0"
+        );
+
+        // Total time should be at least as long as execution time
+        assert!(
+            total_time.as_usize() >= job_execution_time.as_usize(),
+            "total_query_time_ms should be >= job_execution_time_ms"
+        );
+
         Ok(())
     }
 
@@ -182,6 +207,20 @@ mod supported {
 
         assert_batches_eq!(expected, &result);
 
+        // Verify timing metrics
+        let metrics = plan.metrics().unwrap();
+        let job_execution_time = 
metrics.sum_by_name("job_execution_time_ms").unwrap();
+        assert!(
+            job_execution_time.as_usize() > 0,
+            "job_execution_time_ms should be greater than 0"
+        );
+
+        let total_time = metrics.sum_by_name("total_query_time_ms").unwrap();
+        assert!(
+            total_time.as_usize() >= job_execution_time.as_usize(),
+            "total_query_time_ms should be >= job_execution_time_ms"
+        );
+
         Ok(())
     }
 
diff --git a/ballista/core/src/execution_plans/distributed_query.rs 
b/ballista/core/src/execution_plans/distributed_query.rs
index 8a084f96..96981b92 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -71,8 +71,12 @@ pub struct DistributedQueryExec<T: 'static + AsLogicalPlan> {
     /// Plan properties
     properties: PlanProperties,
     /// Execution metrics, currently exposes:
-    /// - row count
-    /// - transferred_bytes
+    /// - output_rows: Total number of rows returned
+    /// - transferred_bytes: Total bytes transferred from executors
+    /// - job_execution_time_ms: Time spent executing on the cluster 
(server-side)
+    /// - job_scheduling_in_ms: Time from query submission to job start 
(includes queue time)
+    /// - job_execution_time_ms: Time spent executing on the cluster (ended_at 
- started_at)
+    /// - job_scheduling_in_ms: Time job waited in scheduler queue (started_at 
- queued_at)
     metrics: ExecutionPlanMetricsSet,
 }
 
@@ -234,6 +238,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for 
DistributedQueryExec<T> {
         let metric_row_count = 
MetricBuilder::new(&self.metrics).output_rows(partition);
         let metric_total_bytes =
             MetricBuilder::new(&self.metrics).counter("transferred_bytes", 
partition);
+
         let stream = futures::stream::once(
             execute_query(
                 self.scheduler_url.clone(),
@@ -241,6 +246,8 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for 
DistributedQueryExec<T> {
                 query,
                 self.config.default_grpc_client_max_message_size(),
                 GrpcClientConfig::from(&self.config),
+                Arc::new(self.metrics.clone()),
+                partition,
             )
             .map_err(|e| ArrowError::ExternalError(Box::new(e))),
         )
@@ -278,7 +285,12 @@ async fn execute_query(
     query: ExecuteQueryParams,
     max_message_size: usize,
     grpc_config: GrpcClientConfig,
+    metrics: Arc<ExecutionPlanMetricsSet>,
+    partition: usize,
 ) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
+    // Capture query submission time for total_query_time_ms
+    let query_start_time = std::time::Instant::now();
+
     info!("Connecting to Ballista scheduler at {scheduler_url}");
     // TODO reuse the scheduler to avoid connecting to the Ballista scheduler 
again and again
     let connection = create_grpc_client_connection(scheduler_url, &grpc_config)
@@ -351,15 +363,43 @@ async fn execute_query(
                 break Err(DataFusionError::Execution(msg));
             }
             Some(job_status::Status::Successful(SuccessfulJob {
+                queued_at,
                 started_at,
                 ended_at,
                 partition_location,
                 ..
             })) => {
-                let duration = ended_at.saturating_sub(started_at);
-                let duration = Duration::from_millis(duration);
+                // Calculate job execution time (server-side execution)
+                let job_execution_ms = ended_at.saturating_sub(started_at);
+                let duration = Duration::from_millis(job_execution_ms);
 
                 info!("Job {job_id} finished executing in {duration:?} ");
+
+                // Calculate scheduling time (server-side queue time)
+                // This includes network latency and actual queue time
+                let scheduling_ms = started_at.saturating_sub(queued_at);
+
+                // Calculate total query time (end-to-end from client 
perspective)
+                let total_elapsed = query_start_time.elapsed();
+                let total_ms = total_elapsed.as_millis();
+
+                // Set timing metrics
+                let metric_job_execution = MetricBuilder::new(&metrics)
+                    .gauge("job_execution_time_ms", partition);
+                metric_job_execution.set(job_execution_ms as usize);
+
+                let metric_scheduling =
+                    MetricBuilder::new(&metrics).gauge("job_scheduling_in_ms", 
partition);
+                metric_scheduling.set(scheduling_ms as usize);
+
+                let metric_total_time =
+                    MetricBuilder::new(&metrics).gauge("total_query_time_ms", 
partition);
+                metric_total_time.set(total_ms as usize);
+
+                // Note: data_transfer_time_ms is not set here because 
partition fetching
+                // happens lazily when the stream is consumed, not during 
execute_query.
+                // This could be added in a future enhancement by wrapping the 
stream.
+
                 let streams = partition_location.into_iter().map(move 
|partition| {
                     let f = fetch_partition(partition, max_message_size, true)
                         .map_err(|e| ArrowError::ExternalError(Box::new(e)));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to