This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 40e72e68 Ballista Executor report plan/operators metrics to Ballista 
Scheduler  (#124)
40e72e68 is described below

commit 40e72e68c2cf777b265fafe36c7664fa2d0d20bd
Author: mingmwang <[email protected]>
AuthorDate: Wed Aug 17 05:54:05 2022 +0800

    Ballista Executor report plan/operators metrics to Ballista Scheduler  
(#124)
    
    * Add timeout and keep-alive settings for Grpc Client
    
    * Add timeout and keep-alive settings for Grpc Server
    
    * move server settings to utils
    
    * fix fmt
    
    * set tcp_nodelay to true explicitly
    
    * Ballista Executor report plan/operators metrics to Ballista Scheduler
    
    * Resolve review comments
    
    * Fix plan display with metrics
    
    * fix shuffle writer metrics collection
    
    * Remove Keyspace::QueuedJobs (#134)
    
    * Remove Keyspace::QueuedJobs
    
    * Fix UT
    
    * Fix cargo clippy for rust 1.63
    
    Co-authored-by: yangzhong <[email protected]>
    
    Co-authored-by: yahoNanJing <[email protected]>
    Co-authored-by: yangzhong <[email protected]>
---
 ballista/rust/core/proto/ballista.proto            |  37 ++++++
 .../rust/core/src/serde/scheduler/from_proto.rs    |  98 ++++++++++++++++
 ballista/rust/core/src/serde/scheduler/to_proto.rs |  68 +++++++++++
 ballista/rust/core/src/utils.rs                    |  14 +++
 ballista/rust/executor/src/execution_loop.rs       |  18 ++-
 ballista/rust/executor/src/executor.rs             |  26 +++--
 ballista/rust/executor/src/executor_server.rs      |  23 +++-
 ballista/rust/executor/src/lib.rs                  |  15 ++-
 ballista/rust/executor/src/metrics/mod.rs          |   7 +-
 ballista/rust/scheduler/src/display.rs             | 128 +++++++++++++++++++++
 ballista/rust/scheduler/src/lib.rs                 |   1 +
 ballista/rust/scheduler/src/planner.rs             |   2 +-
 .../scheduler/src/scheduler_server/event_loop.rs   |   1 +
 .../rust/scheduler/src/scheduler_server/mod.rs     |   3 +
 .../rust/scheduler/src/state/execution_graph.rs    | 107 ++++++++++++++++-
 ballista/rust/scheduler/src/state/task_manager.rs  |  20 ++++
 16 files changed, 541 insertions(+), 27 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 4e2c55f6..04ada4bc 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -429,6 +429,7 @@ message ExecutionGraphStage {
   repeated TaskStatus task_statuses = 6;
   uint32 output_link = 7;
   bool resolved = 8;
+  repeated OperatorMetricsSet stage_metrics = 9;
 }
 
 message ExecutionGraph {
@@ -503,6 +504,41 @@ message ColumnStats {
   uint32 distinct_count = 4;
 }
 
+message OperatorMetricsSet {
+  repeated OperatorMetric metrics = 1;
+}
+
+
+message NamedCount {
+  string name = 1;
+  uint64 value = 2;
+}
+
+message NamedGauge {
+  string name = 1;
+  uint64 value = 2;
+}
+
+message NamedTime {
+  string name = 1;
+  uint64 value = 2;
+}
+
+message OperatorMetric {
+  oneof metric {
+    uint64 output_rows = 1;
+    uint64 elapse_time = 2;
+    uint64 spill_count = 3;
+    uint64 spilled_bytes = 4;
+    uint64 current_memory_usage = 5;
+    NamedCount count = 6;
+    NamedGauge gauge = 7;
+    NamedTime time = 8;
+    int64 start_timestamp = 9;
+    int64 end_timestamp = 10;
+  }
+}
+
 // Used by scheduler
 message ExecutorMetadata {
   string id = 1;
@@ -594,6 +630,7 @@ message TaskStatus {
     FailedTask failed = 3;
     CompletedTask completed = 4;
   }
+  repeated OperatorMetricsSet metrics = 5;
 }
 
 message PollWorkParams {
diff --git a/ballista/rust/core/src/serde/scheduler/from_proto.rs 
b/ballista/rust/core/src/serde/scheduler/from_proto.rs
index b401f1fd..970a5878 100644
--- a/ballista/rust/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/rust/core/src/serde/scheduler/from_proto.rs
@@ -15,11 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use chrono::{TimeZone, Utc};
+use datafusion::physical_plan::metrics::{
+    Count, Gauge, MetricValue, MetricsSet, Time, Timestamp,
+};
+use datafusion::physical_plan::Metric;
 use std::convert::TryInto;
+use std::sync::Arc;
+use std::time::Duration;
 
 use crate::error::BallistaError;
 use crate::serde::protobuf;
 use crate::serde::protobuf::action::ActionType;
+use crate::serde::protobuf::{operator_metric, NamedCount, NamedGauge, 
NamedTime};
 use crate::serde::scheduler::{Action, PartitionId, PartitionLocation, 
PartitionStats};
 
 impl TryInto<Action> for protobuf::Action {
@@ -104,3 +112,93 @@ impl TryInto<PartitionLocation> for 
protobuf::PartitionLocation {
         })
     }
 }
+
+impl TryInto<MetricValue> for protobuf::OperatorMetric {
+    type Error = BallistaError;
+
+    fn try_into(self) -> Result<MetricValue, Self::Error> {
+        match self.metric {
+            Some(operator_metric::Metric::OutputRows(value)) => {
+                let count = Count::new();
+                count.add(value as usize);
+                Ok(MetricValue::OutputRows(count))
+            }
+            Some(operator_metric::Metric::ElapseTime(value)) => {
+                let time = Time::new();
+                time.add_duration(Duration::from_nanos(value));
+                Ok(MetricValue::ElapsedCompute(time))
+            }
+            Some(operator_metric::Metric::SpillCount(value)) => {
+                let count = Count::new();
+                count.add(value as usize);
+                Ok(MetricValue::SpillCount(count))
+            }
+            Some(operator_metric::Metric::SpilledBytes(value)) => {
+                let count = Count::new();
+                count.add(value as usize);
+                Ok(MetricValue::SpilledBytes(count))
+            }
+            Some(operator_metric::Metric::CurrentMemoryUsage(value)) => {
+                let gauge = Gauge::new();
+                gauge.add(value as usize);
+                Ok(MetricValue::CurrentMemoryUsage(gauge))
+            }
+            Some(operator_metric::Metric::Count(NamedCount { name, value })) 
=> {
+                let count = Count::new();
+                count.add(value as usize);
+                Ok(MetricValue::Count {
+                    name: name.into(),
+                    count,
+                })
+            }
+            Some(operator_metric::Metric::Gauge(NamedGauge { name, value })) 
=> {
+                let gauge = Gauge::new();
+                gauge.add(value as usize);
+                Ok(MetricValue::Gauge {
+                    name: name.into(),
+                    gauge,
+                })
+            }
+            Some(operator_metric::Metric::Time(NamedTime { name, value })) => {
+                let time = Time::new();
+                time.add_duration(Duration::from_nanos(value));
+                Ok(MetricValue::Time {
+                    name: name.into(),
+                    time,
+                })
+            }
+            Some(operator_metric::Metric::StartTimestamp(value)) => {
+                let timestamp = Timestamp::new();
+                timestamp.set(Utc.timestamp_nanos(value));
+                Ok(MetricValue::StartTimestamp(timestamp))
+            }
+            Some(operator_metric::Metric::EndTimestamp(value)) => {
+                let timestamp = Timestamp::new();
+                timestamp.set(Utc.timestamp_nanos(value));
+                Ok(MetricValue::EndTimestamp(timestamp))
+            }
+            None => Err(BallistaError::General(
+                "scheduler::from_proto(OperatorMetric) metric is 
None.".to_owned(),
+            )),
+        }
+    }
+}
+
+impl TryInto<MetricsSet> for protobuf::OperatorMetricsSet {
+    type Error = BallistaError;
+
+    fn try_into(self) -> Result<MetricsSet, Self::Error> {
+        let mut ms = MetricsSet::new();
+        let metrics = self
+            .metrics
+            .into_iter()
+            .map(|m| m.try_into())
+            .collect::<Result<Vec<_>, BallistaError>>()?;
+
+        for value in metrics {
+            let new_metric = Arc::new(Metric::new(value, None));
+            ms.push(new_metric)
+        }
+        Ok(ms)
+    }
+}
diff --git a/ballista/rust/core/src/serde/scheduler/to_proto.rs 
b/ballista/rust/core/src/serde/scheduler/to_proto.rs
index 3a1789f2..815bc96d 100644
--- a/ballista/rust/core/src/serde/scheduler/to_proto.rs
+++ b/ballista/rust/core/src/serde/scheduler/to_proto.rs
@@ -15,11 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
 use std::convert::TryInto;
 
 use crate::error::BallistaError;
 use crate::serde::protobuf;
 use crate::serde::protobuf::action::ActionType;
+use crate::serde::protobuf::{operator_metric, NamedCount, NamedGauge, 
NamedTime};
 use crate::serde::scheduler::{Action, PartitionId, PartitionLocation, 
PartitionStats};
 use datafusion::physical_plan::Partitioning;
 
@@ -103,3 +105,69 @@ pub fn hash_partitioning_to_proto(
         ))),
     }
 }
+
+impl TryInto<protobuf::OperatorMetric> for &MetricValue {
+    type Error = BallistaError;
+
+    fn try_into(self) -> Result<protobuf::OperatorMetric, Self::Error> {
+        match self {
+            MetricValue::OutputRows(count) => Ok(protobuf::OperatorMetric {
+                metric: Some(operator_metric::Metric::OutputRows(count.value() 
as u64)),
+            }),
+            MetricValue::ElapsedCompute(time) => Ok(protobuf::OperatorMetric {
+                metric: Some(operator_metric::Metric::ElapseTime(time.value() 
as u64)),
+            }),
+            MetricValue::SpillCount(count) => Ok(protobuf::OperatorMetric {
+                metric: Some(operator_metric::Metric::SpillCount(count.value() 
as u64)),
+            }),
+            MetricValue::SpilledBytes(count) => Ok(protobuf::OperatorMetric {
+                metric: 
Some(operator_metric::Metric::SpilledBytes(count.value() as u64)),
+            }),
+            MetricValue::CurrentMemoryUsage(gauge) => 
Ok(protobuf::OperatorMetric {
+                metric: Some(operator_metric::Metric::CurrentMemoryUsage(
+                    gauge.value() as u64
+                )),
+            }),
+            MetricValue::Count { name, count } => Ok(protobuf::OperatorMetric {
+                metric: Some(operator_metric::Metric::Count(NamedCount {
+                    name: name.to_string(),
+                    value: count.value() as u64,
+                })),
+            }),
+            MetricValue::Gauge { name, gauge } => Ok(protobuf::OperatorMetric {
+                metric: Some(operator_metric::Metric::Gauge(NamedGauge {
+                    name: name.to_string(),
+                    value: gauge.value() as u64,
+                })),
+            }),
+            MetricValue::Time { name, time } => Ok(protobuf::OperatorMetric {
+                metric: Some(operator_metric::Metric::Time(NamedTime {
+                    name: name.to_string(),
+                    value: time.value() as u64,
+                })),
+            }),
+            MetricValue::StartTimestamp(timestamp) => 
Ok(protobuf::OperatorMetric {
+                metric: Some(operator_metric::Metric::StartTimestamp(
+                    timestamp.value().map(|m| 
m.timestamp_nanos()).unwrap_or(0),
+                )),
+            }),
+            MetricValue::EndTimestamp(timestamp) => 
Ok(protobuf::OperatorMetric {
+                metric: Some(operator_metric::Metric::EndTimestamp(
+                    timestamp.value().map(|m| 
m.timestamp_nanos()).unwrap_or(0),
+                )),
+            }),
+        }
+    }
+}
+
+impl TryInto<protobuf::OperatorMetricsSet> for MetricsSet {
+    type Error = BallistaError;
+
+    fn try_into(self) -> Result<protobuf::OperatorMetricsSet, Self::Error> {
+        let metrics = self
+            .iter()
+            .map(|m| m.value().try_into())
+            .collect::<Result<Vec<_>, BallistaError>>()?;
+        Ok(protobuf::OperatorMetricsSet { metrics })
+    }
+}
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index d356703a..ae8d5773 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -38,6 +38,7 @@ use datafusion::physical_plan::empty::EmptyExec;
 use datafusion::physical_plan::file_format::{CsvExec, ParquetExec};
 use datafusion::physical_plan::filter::FilterExec;
 use datafusion::physical_plan::hash_join::HashJoinExec;
+use datafusion::physical_plan::metrics::MetricsSet;
 use datafusion::physical_plan::projection::ProjectionExec;
 use datafusion::physical_plan::sorts::sort::SortExec;
 use datafusion::physical_plan::{metrics, ExecutionPlan, RecordBatchStream};
@@ -342,3 +343,16 @@ pub fn create_grpc_server() -> Server {
         .http2_keepalive_interval(Option::Some(Duration::from_secs(300)))
         .http2_keepalive_timeout(Option::Some(Duration::from_secs(20)))
 }
+
+pub fn collect_plan_metrics(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
+    let mut metrics_array = Vec::<MetricsSet>::new();
+    if let Some(metrics) = plan.metrics() {
+        metrics_array.push(metrics);
+    }
+    plan.children().iter().for_each(|c| {
+        collect_plan_metrics(c.as_ref())
+            .into_iter()
+            .for_each(|e| metrics_array.push(e))
+    });
+    metrics_array
+}
diff --git a/ballista/rust/executor/src/execution_loop.rs 
b/ballista/rust/executor/src/execution_loop.rs
index c9408124..471377e4 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -28,12 +28,14 @@ use ballista_core::error::BallistaError;
 use 
ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
 use ballista_core::serde::scheduler::ExecutorSpecification;
 use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+use ballista_core::utils::collect_plan_metrics;
 use datafusion::execution::context::TaskContext;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use futures::FutureExt;
 use log::{debug, error, info, trace, warn};
 use std::any::Any;
 use std::collections::HashMap;
+use std::convert::TryInto;
 use std::error::Error;
 use std::ops::Deref;
 use std::sync::atomic::{AtomicUsize, Ordering};
@@ -183,14 +185,18 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, 
U: 'static + AsExecution
         plan.schema().as_ref(),
     )?;
 
+    let shuffle_writer_plan = executor.new_shuffle_writer(
+        task_id.job_id.clone(),
+        task_id.stage_id as usize,
+        plan,
+    )?;
     tokio::spawn(async move {
         use std::panic::AssertUnwindSafe;
-
         let execution_result = match 
AssertUnwindSafe(executor.execute_shuffle_write(
             task_id.job_id.clone(),
             task_id.stage_id as usize,
             task_id.partition_id as usize,
-            plan,
+            shuffle_writer_plan.clone(),
             task_context,
             shuffle_output_partitioning,
         ))
@@ -209,10 +215,18 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, 
U: 'static + AsExecution
         debug!("Statistics: {:?}", execution_result);
         available_tasks_slots.fetch_add(1, Ordering::SeqCst);
 
+        let plan_metrics = collect_plan_metrics(shuffle_writer_plan.as_ref());
+        let operator_metrics = plan_metrics
+            .into_iter()
+            .map(|m| m.try_into())
+            .collect::<Result<Vec<_>, BallistaError>>()
+            .ok();
+
         let _ = task_status_sender.send(as_task_status(
             execution_result,
             executor.metadata.id.clone(),
             task_id,
+            operator_metrics,
         ));
     });
 
diff --git a/ballista/rust/executor/src/executor.rs 
b/ballista/rust/executor/src/executor.rs
index ad643ba1..34af3427 100644
--- a/ballista/rust/executor/src/executor.rs
+++ b/ballista/rust/executor/src/executor.rs
@@ -88,16 +88,30 @@ impl Executor {
         job_id: String,
         stage_id: usize,
         part: usize,
-        plan: Arc<dyn ExecutionPlan>,
+        shuffle_writer: Arc<ShuffleWriterExec>,
         task_ctx: Arc<TaskContext>,
         _shuffle_output_partitioning: Option<Partitioning>,
     ) -> Result<Vec<protobuf::ShuffleWritePartition>, BallistaError> {
+        let partitions = shuffle_writer.execute_shuffle_write(part, 
task_ctx).await?;
+        self.metrics_collector
+            .record_stage(&job_id, stage_id, part, shuffle_writer);
+
+        Ok(partitions)
+    }
+
+    /// Recreate the shuffle writer with the correct working directory.
+    pub fn new_shuffle_writer(
+        &self,
+        job_id: String,
+        stage_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<ShuffleWriterExec>, BallistaError> {
         let exec = if let Some(shuffle_writer) =
             plan.as_any().downcast_ref::<ShuffleWriterExec>()
         {
             // recreate the shuffle writer with the correct working directory
             ShuffleWriterExec::try_new(
-                job_id.clone(),
+                job_id,
                 stage_id,
                 plan.children()[0].clone(),
                 self.work_dir.clone(),
@@ -109,13 +123,7 @@ impl Executor {
                     .to_string(),
             ))
         }?;
-
-        let partitions = exec.execute_shuffle_write(part, task_ctx).await?;
-
-        self.metrics_collector
-            .record_stage(&job_id, stage_id, part, exec);
-
-        Ok(partitions)
+        Ok(Arc::new(exec))
     }
 
     pub fn work_dir(&self) -> &str {
diff --git a/ballista/rust/executor/src/executor_server.rs 
b/ballista/rust/executor/src/executor_server.rs
index e484e05a..591c88a1 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use std::collections::HashMap;
+use std::convert::TryInto;
 use std::ops::Deref;
 use std::sync::Arc;
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -39,7 +40,7 @@ use ballista_core::serde::protobuf::{
 };
 use ballista_core::serde::scheduler::ExecutorState;
 use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
-use ballista_core::utils::create_grpc_server;
+use ballista_core::utils::{collect_plan_metrics, create_grpc_server};
 use datafusion::execution::context::TaskContext;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion_proto::logical_plan::AsLogicalPlan;
@@ -233,13 +234,19 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorServer<T,
             plan.schema().as_ref(),
         )?;
 
+        let shuffle_writer_plan = self.executor.new_shuffle_writer(
+            task_id.job_id.clone(),
+            task_id.stage_id as usize,
+            plan,
+        )?;
+
         let execution_result = self
             .executor
             .execute_shuffle_write(
                 task_id.job_id.clone(),
                 task_id.stage_id as usize,
                 task_id.partition_id as usize,
-                plan,
+                shuffle_writer_plan.clone(),
                 task_context,
                 shuffle_output_partitioning,
             )
@@ -247,8 +254,18 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorServer<T,
         info!("Done with task {}", task_id_log);
         debug!("Statistics: {:?}", execution_result);
 
+        let plan_metrics = collect_plan_metrics(shuffle_writer_plan.as_ref());
+        let operator_metrics = plan_metrics
+            .into_iter()
+            .map(|m| m.try_into())
+            .collect::<Result<Vec<_>, BallistaError>>()?;
         let executor_id = &self.executor.metadata.id;
-        let task_status = as_task_status(execution_result, 
executor_id.clone(), task_id);
+        let task_status = as_task_status(
+            execution_result,
+            executor_id.clone(),
+            task_id,
+            Some(operator_metrics),
+        );
 
         let task_status_sender = self.executor_env.tx_task_status.clone();
         task_status_sender.send(task_status).await.unwrap();
diff --git a/ballista/rust/executor/src/lib.rs 
b/ballista/rust/executor/src/lib.rs
index 4d145b26..e93993b6 100644
--- a/ballista/rust/executor/src/lib.rs
+++ b/ballista/rust/executor/src/lib.rs
@@ -32,21 +32,27 @@ pub use standalone::new_standalone_executor;
 use log::info;
 
 use ballista_core::serde::protobuf::{
-    task_status, CompletedTask, FailedTask, PartitionId, ShuffleWritePartition,
-    TaskStatus,
+    task_status, CompletedTask, FailedTask, OperatorMetricsSet, PartitionId,
+    ShuffleWritePartition, TaskStatus,
 };
 
 pub fn as_task_status(
     execution_result: ballista_core::error::Result<Vec<ShuffleWritePartition>>,
     executor_id: String,
     task_id: PartitionId,
+    operator_metrics: Option<Vec<OperatorMetricsSet>>,
 ) -> TaskStatus {
+    let metrics = operator_metrics.unwrap_or_default();
     match execution_result {
         Ok(partitions) => {
-            info!("Task {:?} finished", task_id);
-
+            info!(
+                "Task {:?} finished with operator_metrics array size {}",
+                task_id,
+                metrics.len()
+            );
             TaskStatus {
                 task_id: Some(task_id),
+                metrics,
                 status: Some(task_status::Status::Completed(CompletedTask {
                     executor_id,
                     partitions,
@@ -59,6 +65,7 @@ pub fn as_task_status(
 
             TaskStatus {
                 task_id: Some(task_id),
+                metrics,
                 status: Some(task_status::Status::Failed(FailedTask {
                     error: format!("Task failed due to Tokio error: {}", 
error_msg),
                 })),
diff --git a/ballista/rust/executor/src/metrics/mod.rs 
b/ballista/rust/executor/src/metrics/mod.rs
index 2c7e1d50..9a735c50 100644
--- a/ballista/rust/executor/src/metrics/mod.rs
+++ b/ballista/rust/executor/src/metrics/mod.rs
@@ -17,6 +17,7 @@
 
 use ballista_core::execution_plans::ShuffleWriterExec;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use std::sync::Arc;
 
 /// `ExecutorMetricsCollector` records metrics for `ShuffleWriteExec`
 /// after they are executed.
@@ -30,7 +31,7 @@ pub trait ExecutorMetricsCollector: Send + Sync {
         job_id: &str,
         stage_id: usize,
         partition: usize,
-        plan: ShuffleWriterExec,
+        plan: Arc<ShuffleWriterExec>,
     );
 }
 
@@ -45,14 +46,14 @@ impl ExecutorMetricsCollector for LoggingMetricsCollector {
         job_id: &str,
         stage_id: usize,
         partition: usize,
-        plan: ShuffleWriterExec,
+        plan: Arc<ShuffleWriterExec>,
     ) {
         println!(
             "=== [{}/{}/{}] Physical plan with metrics ===\n{}\n",
             job_id,
             stage_id,
             partition,
-            DisplayableExecutionPlan::with_metrics(&plan).indent()
+            DisplayableExecutionPlan::with_metrics(plan.as_ref()).indent()
         );
     }
 }
diff --git a/ballista/rust/scheduler/src/display.rs 
b/ballista/rust/scheduler/src/display.rs
new file mode 100644
index 00000000..e4557a6b
--- /dev/null
+++ b/ballista/rust/scheduler/src/display.rs
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Implementation of ballista physical plan display with metrics. See
+//! [`crate::physical_plan::displayable`] for examples of how to
+//! format
+
+use datafusion::logical_plan::{StringifiedPlan, ToStringifiedPlan};
+use datafusion::physical_plan::metrics::MetricsSet;
+use datafusion::physical_plan::{
+    accept, DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor,
+};
+use std::fmt;
+
+/// Wraps an `ExecutionPlan` to display this plan with metrics 
collected/aggregated.
+/// The metrics must be collected in the same order as how we visit and 
display the plan.
+pub struct DisplayableBallistaExecutionPlan<'a> {
+    inner: &'a dyn ExecutionPlan,
+    metrics: &'a Vec<MetricsSet>,
+}
+
+impl<'a> DisplayableBallistaExecutionPlan<'a> {
+    /// Create a wrapper around an [`'ExecutionPlan'] which can be
+    /// pretty printed with aggregated metrics.
+    pub fn new(inner: &'a dyn ExecutionPlan, metrics: &'a Vec<MetricsSet>) -> 
Self {
+        Self { inner, metrics }
+    }
+
+    /// Return a `format`able structure that produces a single line
+    /// per node.
+    ///
+    /// ```text
+    /// ProjectionExec: expr=[a]
+    ///   CoalesceBatchesExec: target_batch_size=4096
+    ///     FilterExec: a < 5
+    ///       RepartitionExec: partitioning=RoundRobinBatch(16)
+    ///         CsvExec: source=...",
+    /// ```
+    pub fn indent(&self) -> impl fmt::Display + 'a {
+        struct Wrapper<'a> {
+            plan: &'a dyn ExecutionPlan,
+            metrics: &'a Vec<MetricsSet>,
+        }
+        impl<'a> fmt::Display for Wrapper<'a> {
+            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+                let t = DisplayFormatType::Default;
+                let mut visitor = IndentVisitor {
+                    t,
+                    f,
+                    indent: 0,
+                    metrics: self.metrics,
+                    metric_index: 0,
+                };
+                accept(self.plan, &mut visitor)
+            }
+        }
+        Wrapper {
+            plan: self.inner,
+            metrics: self.metrics,
+        }
+    }
+}
+
+/// Formats plans with a single line per node.
+struct IndentVisitor<'a, 'b> {
+    /// How to format each node
+    t: DisplayFormatType,
+    /// Write to this formatter
+    f: &'a mut fmt::Formatter<'b>,
+    /// Indent size
+    indent: usize,
+    /// The metrics along with the plan
+    metrics: &'a Vec<MetricsSet>,
+    /// The metric index
+    metric_index: usize,
+}
+
+impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
+    type Error = fmt::Error;
+    fn pre_visit(
+        &mut self,
+        plan: &dyn ExecutionPlan,
+    ) -> std::result::Result<bool, Self::Error> {
+        write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
+        plan.fmt_as(self.t, self.f)?;
+        if let Some(metrics) = self.metrics.get(self.metric_index) {
+            let metrics = metrics
+                .aggregate_by_partition()
+                .sorted_for_display()
+                .timestamps_removed();
+            write!(self.f, ", metrics=[{}]", metrics)?;
+        } else {
+            write!(self.f, ", metrics=[]")?;
+        }
+        writeln!(self.f)?;
+        self.indent += 1;
+        self.metric_index += 1;
+        Ok(true)
+    }
+
+    fn post_visit(&mut self, _plan: &dyn ExecutionPlan) -> Result<bool, 
Self::Error> {
+        self.indent -= 1;
+        Ok(true)
+    }
+}
+
+impl<'a> ToStringifiedPlan for DisplayableBallistaExecutionPlan<'a> {
+    fn to_stringified(
+        &self,
+        plan_type: datafusion::logical_plan::PlanType,
+    ) -> StringifiedPlan {
+        StringifiedPlan::new(plan_type, self.indent().to_string())
+    }
+}
diff --git a/ballista/rust/scheduler/src/lib.rs 
b/ballista/rust/scheduler/src/lib.rs
index 2c2fa410..838eb562 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -18,6 +18,7 @@
 #![doc = include_str ! ("../README.md")]
 
 pub mod api;
+pub mod display;
 pub mod planner;
 pub mod scheduler_server;
 #[cfg(feature = "sled")]
diff --git a/ballista/rust/scheduler/src/planner.rs 
b/ballista/rust/scheduler/src/planner.rs
index 1c946937..9c393bac 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -63,7 +63,7 @@ impl DistributedPlanner {
         job_id: &'a str,
         execution_plan: Arc<dyn ExecutionPlan>,
     ) -> Result<Vec<Arc<ShuffleWriterExec>>> {
-        info!("planning query stages");
+        info!("planning query stages for job {}", job_id);
         let (new_plan, mut stages) =
             self.plan_query_stages_internal(job_id, execution_plan)?;
         stages.push(create_shuffle_writer(
diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs 
b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
index b0037ab0..d6397ba6 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
@@ -322,6 +322,7 @@ mod test {
                         stage_id: 1,
                         partition_id: 0,
                     }),
+                    metrics: vec![],
                     status: Some(task_status::Status::Completed(CompletedTask {
                         executor_id: "executor-1".to_string(),
                         partitions,
diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs 
b/ballista/rust/scheduler/src/scheduler_server/mod.rs
index a6a26d80..2fac229b 100644
--- a/ballista/rust/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs
@@ -364,6 +364,7 @@ mod test {
                         executor_id: "executor-1".to_owned(),
                         partitions,
                     })),
+                    metrics: vec![],
                     task_id: Some(PartitionId {
                         job_id: job_id.to_owned(),
                         stage_id: task.partition.stage_id as u32,
@@ -493,6 +494,7 @@ mod test {
                                             partitions,
                                         },
                                     )),
+                                    metrics: vec![],
                                     task_id: Some(PartitionId {
                                         job_id: job_id.to_owned(),
                                         stage_id: task.partition.stage_id as 
u32,
@@ -627,6 +629,7 @@ mod test {
                                             error: "".to_string(),
                                         },
                                     )),
+                                    metrics: vec![],
                                     task_id: Some(PartitionId {
                                         job_id: job_id.to_owned(),
                                         stage_id: task.partition.stage_id as 
u32,
diff --git a/ballista/rust/scheduler/src/state/execution_graph.rs 
b/ballista/rust/scheduler/src/state/execution_graph.rs
index 1412f7e0..b13f7c58 100644
--- a/ballista/rust/scheduler/src/state/execution_graph.rs
+++ b/ballista/rust/scheduler/src/state/execution_graph.rs
@@ -15,12 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::display::DisplayableBallistaExecutionPlan;
 use crate::planner::DistributedPlanner;
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
 
 use ballista_core::serde::protobuf::{
-    self, CompletedJob, JobStatus, QueuedJob, TaskStatus,
+    self, CompletedJob, JobStatus, OperatorMetricsSet, QueuedJob, TaskStatus,
 };
 use ballista_core::serde::protobuf::{job_status, FailedJob, 
ShuffleWritePartition};
 use ballista_core::serde::protobuf::{task_status, RunningTask};
@@ -28,14 +29,16 @@ use ballista_core::serde::scheduler::{
     ExecutorMetadata, PartitionId, PartitionLocation, PartitionStats,
 };
 use datafusion::physical_plan::{
-    accept, ExecutionPlan, ExecutionPlanVisitor, Partitioning,
+    accept, ExecutionPlan, ExecutionPlanVisitor, Metric, Partitioning,
 };
-use log::debug;
+use log::{debug, info};
 use std::collections::HashMap;
 use std::convert::TryInto;
 use std::fmt::{Debug, Formatter};
 
+use ballista_core::utils::collect_plan_metrics;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
 use std::sync::Arc;
 
 /// This data structure collects the partition locations for an 
`ExecutionStage`.
@@ -103,6 +106,8 @@ pub struct ExecutionStage {
     /// Flag indicating whether all input partitions have been resolved and 
the plan
     /// has UnresovledShuffleExec operators resolved to ShuffleReadExec 
operators.
     pub(crate) resolved: bool,
+    /// Combined metrics of the already finished tasks in the stage, If it is 
None, no task is finished yet.
+    pub(crate) stage_metrics: Option<Vec<MetricsSet>>,
 }
 
 impl Debug for ExecutionStage {
@@ -153,6 +158,7 @@ impl ExecutionStage {
             task_statuses: vec![None; num_tasks],
             output_link,
             resolved,
+            stage_metrics: None,
         }
     }
 
@@ -230,6 +236,60 @@ impl ExecutionStage {
         self.task_statuses[partition] = Some(status);
     }
 
+    /// update and combine the task metrics to the stage metrics
+    pub fn update_task_metrics(
+        &mut self,
+        partition: usize,
+        metrics: Vec<OperatorMetricsSet>,
+    ) -> Result<()> {
+        if let Some(combined_metrics) = &mut self.stage_metrics {
+            if metrics.len() != combined_metrics.len() {
+                return Err(BallistaError::Internal(format!("Error updating 
task metrics to stage {}, task metrics array size {} does not equal \
+                with the stage metrics array size {} for task {}", 
self.stage_id, metrics.len(), combined_metrics.len(), partition)));
+            }
+            let metrics_values_array = metrics
+                .into_iter()
+                .map(|ms| {
+                    ms.metrics
+                        .into_iter()
+                        .map(|m| m.try_into())
+                        .collect::<Result<Vec<_>>>()
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            let new_metrics_set = combined_metrics
+                .iter_mut()
+                .zip(metrics_values_array)
+                .map(|(first, second)| {
+                    Self::combine_metrics_set(first, second, partition)
+                })
+                .collect();
+            self.stage_metrics = Some(new_metrics_set)
+        } else {
+            let new_metrics_set = metrics
+                .into_iter()
+                .map(|ms| ms.try_into())
+                .collect::<Result<Vec<_>>>()?;
+            if !new_metrics_set.is_empty() {
+                self.stage_metrics = Some(new_metrics_set)
+            }
+        }
+        Ok(())
+    }
+
+    pub fn combine_metrics_set(
+        first: &mut MetricsSet,
+        second: Vec<MetricValue>,
+        partition: usize,
+    ) -> MetricsSet {
+        for metric_value in second {
+            // TODO recheck the lable logic
+            let new_metric = Arc::new(Metric::new(metric_value, 
Some(partition)));
+            first.push(new_metric);
+        }
+        first.aggregate_by_partition()
+    }
+
     /// Add input partitions published from an input stage.
     pub fn add_input_partitions(
         &mut self,
@@ -467,8 +527,8 @@ impl ExecutionGraph {
         self.stages.values().all(|s| s.complete())
     }
 
-    /// Update task statuses in the graph. This will push shuffle partitions 
to their
-    /// respective shuffle read stages.
+    /// Update task statuses and task metrics in the graph.
+    /// This will also push shuffle partitions to their respective shuffle 
read stages.
     pub fn update_task_status(
         &mut self,
         executor: &ExecutorMetadata,
@@ -482,6 +542,7 @@ impl ExecutionGraph {
                         stage_id,
                         partition_id,
                     }),
+                metrics: operator_metrics,
                 status: Some(task_status),
             } = status
             {
@@ -497,6 +558,7 @@ impl ExecutionGraph {
                 let partition = partition_id as usize;
                 if let Some(stage) = self.stages.get_mut(&stage_id) {
                     stage.update_task_status(partition, task_status.clone());
+                    let stage_plan = stage.plan.clone();
                     let stage_complete = stage.complete();
 
                     // TODO Should be able to reschedule this task.
@@ -513,6 +575,40 @@ impl ExecutionGraph {
                     } else if let 
task_status::Status::Completed(completed_task) =
                         task_status
                     {
+                        // update task metrics for completed task
+                        stage.update_task_metrics(partition, 
operator_metrics)?;
+
+                        // if this stage is completed, we want to combine the 
stage metrics to plan's metric set and print out the plan
+                        if stage_complete && 
stage.stage_metrics.as_ref().is_some() {
+                            // The plan_metrics collected here is a snapshot 
clone from the plan metrics.
+                            // They are all empty now and need to combine with 
the stage metrics in the ExecutionStages
+                            let mut plan_metrics =
+                                collect_plan_metrics(stage_plan.as_ref());
+                            let stage_metrics = stage
+                                .stage_metrics
+                                .as_ref()
+                                .expect("stage metrics should not be None.");
+                            if plan_metrics.len() != stage_metrics.len() {
+                                return 
Err(BallistaError::Internal(format!("Error combine stage metrics to plan for 
stage {},  plan metrics array size {} does not equal \
+                to the stage metrics array size {}", stage_id, 
plan_metrics.len(), stage_metrics.len())));
+                            }
+                            
plan_metrics.iter_mut().zip(stage_metrics).for_each(
+                                |(plan_metric, stage_metric)| {
+                                    stage_metric
+                                        .iter()
+                                        .for_each(|s| 
plan_metric.push(s.clone()));
+                                },
+                            );
+
+                            info!(
+                                "=== [{}/{}/{}] Stage finished, physical plan 
with metrics ===\n{}\n",
+                                job_id,
+                                stage_id,
+                                partition,
+                                
DisplayableBallistaExecutionPlan::new(stage_plan.as_ref(), 
plan_metrics.as_ref()).indent()
+                            );
+                        }
+
                         let locations = partition_to_location(
                             self.job_id.as_str(),
                             stage_id,
@@ -808,6 +904,7 @@ mod test {
                     executor_id: "executor-1".to_owned(),
                     partitions,
                 })),
+                metrics: vec![],
                 task_id: Some(protobuf::PartitionId {
                     job_id: job_id.clone(),
                     stage_id: task.partition.stage_id as u32,
diff --git a/ballista/rust/scheduler/src/state/task_manager.rs 
b/ballista/rust/scheduler/src/state/task_manager.rs
index a5bc049a..cc329263 100644
--- a/ballista/rust/scheduler/src/state/task_manager.rs
+++ b/ballista/rust/scheduler/src/state/task_manager.rs
@@ -590,6 +590,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                     },
                 );
             }
+            let stage_metrics = if stage.stage_metrics.is_empty() {
+                None
+            } else {
+                let ms = stage
+                    .stage_metrics
+                    .into_iter()
+                    .map(|m| m.try_into())
+                    .collect::<Result<Vec<_>>>()?;
+                Some(ms)
+            };
 
             let execution_stage = ExecutionStage {
                 stage_id: stage.stage_id as usize,
@@ -600,6 +610,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                 task_statuses,
                 output_link,
                 resolved: stage.resolved,
+                stage_metrics,
             };
             stages.insert(stage_id, execution_stage);
         }
@@ -681,6 +692,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                                 stage_id: stage_id as u32,
                                 partition_id: partition as u32,
                             }),
+                            // task metrics should not persist.
+                            metrics: vec![],
                             status: Some(status),
                         })
                     })
@@ -689,6 +702,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                 let output_partitioning =
                     
hash_partitioning_to_proto(stage.output_partitioning.as_ref())?;
 
+                let stage_metrics = stage
+                    .stage_metrics
+                    .unwrap_or_default()
+                    .into_iter()
+                    .map(|m| m.try_into())
+                    .collect::<Result<Vec<_>>>()?;
                 Ok(protobuf::ExecutionGraphStage {
                     stage_id: stage_id as u64,
                     partitions: stage.partitions as u32,
@@ -698,6 +717,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                     task_statuses,
                     output_link,
                     resolved: stage.resolved,
+                    stage_metrics,
                 })
             })
             .collect::<Result<Vec<_>>>()?;

Reply via email to