This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 40e72e68 Ballista Executor report plan/operators metrics to Ballista
Scheduler (#124)
40e72e68 is described below
commit 40e72e68c2cf777b265fafe36c7664fa2d0d20bd
Author: mingmwang <[email protected]>
AuthorDate: Wed Aug 17 05:54:05 2022 +0800
Ballista Executor report plan/operators metrics to Ballista Scheduler
(#124)
* Add timeout and keep-alive settings for Grpc Client
* Add timeout and keep-alive settings for Grpc Server
* move server settings to utils
* fix fmt
* set tcp_nodelay to true explicitly
* Ballista Executor report plan/operators metrics to Ballista Scheduler
* Resolve review comments
* Fix plan display with metrics
* fix shuffle writer metrics collection
* Remove Keyspace::QueuedJobs (#134)
* Remove Keyspace::QueuedJobs
* Fix UT
* Fix cargo clippy for rust 1.63
Co-authored-by: yangzhong <[email protected]>
Co-authored-by: yahoNanJing <[email protected]>
Co-authored-by: yangzhong <[email protected]>
---
ballista/rust/core/proto/ballista.proto | 37 ++++++
.../rust/core/src/serde/scheduler/from_proto.rs | 98 ++++++++++++++++
ballista/rust/core/src/serde/scheduler/to_proto.rs | 68 +++++++++++
ballista/rust/core/src/utils.rs | 14 +++
ballista/rust/executor/src/execution_loop.rs | 18 ++-
ballista/rust/executor/src/executor.rs | 26 +++--
ballista/rust/executor/src/executor_server.rs | 23 +++-
ballista/rust/executor/src/lib.rs | 15 ++-
ballista/rust/executor/src/metrics/mod.rs | 7 +-
ballista/rust/scheduler/src/display.rs | 128 +++++++++++++++++++++
ballista/rust/scheduler/src/lib.rs | 1 +
ballista/rust/scheduler/src/planner.rs | 2 +-
.../scheduler/src/scheduler_server/event_loop.rs | 1 +
.../rust/scheduler/src/scheduler_server/mod.rs | 3 +
.../rust/scheduler/src/state/execution_graph.rs | 107 ++++++++++++++++-
ballista/rust/scheduler/src/state/task_manager.rs | 20 ++++
16 files changed, 541 insertions(+), 27 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 4e2c55f6..04ada4bc 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -429,6 +429,7 @@ message ExecutionGraphStage {
repeated TaskStatus task_statuses = 6;
uint32 output_link = 7;
bool resolved = 8;
+ repeated OperatorMetricsSet stage_metrics = 9;
}
message ExecutionGraph {
@@ -503,6 +504,41 @@ message ColumnStats {
uint32 distinct_count = 4;
}
+message OperatorMetricsSet {
+ repeated OperatorMetric metrics = 1;
+}
+
+
+message NamedCount {
+ string name = 1;
+ uint64 value = 2;
+}
+
+message NamedGauge {
+ string name = 1;
+ uint64 value = 2;
+}
+
+message NamedTime {
+ string name = 1;
+ uint64 value = 2;
+}
+
+message OperatorMetric {
+ oneof metric {
+ uint64 output_rows = 1;
+ uint64 elapse_time = 2;
+ uint64 spill_count = 3;
+ uint64 spilled_bytes = 4;
+ uint64 current_memory_usage = 5;
+ NamedCount count = 6;
+ NamedGauge gauge = 7;
+ NamedTime time = 8;
+ int64 start_timestamp = 9;
+ int64 end_timestamp = 10;
+ }
+}
+
// Used by scheduler
message ExecutorMetadata {
string id = 1;
@@ -594,6 +630,7 @@ message TaskStatus {
FailedTask failed = 3;
CompletedTask completed = 4;
}
+ repeated OperatorMetricsSet metrics = 5;
}
message PollWorkParams {
diff --git a/ballista/rust/core/src/serde/scheduler/from_proto.rs
b/ballista/rust/core/src/serde/scheduler/from_proto.rs
index b401f1fd..970a5878 100644
--- a/ballista/rust/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/rust/core/src/serde/scheduler/from_proto.rs
@@ -15,11 +15,19 @@
// specific language governing permissions and limitations
// under the License.
+use chrono::{TimeZone, Utc};
+use datafusion::physical_plan::metrics::{
+ Count, Gauge, MetricValue, MetricsSet, Time, Timestamp,
+};
+use datafusion::physical_plan::Metric;
use std::convert::TryInto;
+use std::sync::Arc;
+use std::time::Duration;
use crate::error::BallistaError;
use crate::serde::protobuf;
use crate::serde::protobuf::action::ActionType;
+use crate::serde::protobuf::{operator_metric, NamedCount, NamedGauge,
NamedTime};
use crate::serde::scheduler::{Action, PartitionId, PartitionLocation,
PartitionStats};
impl TryInto<Action> for protobuf::Action {
@@ -104,3 +112,93 @@ impl TryInto<PartitionLocation> for
protobuf::PartitionLocation {
})
}
}
+
+impl TryInto<MetricValue> for protobuf::OperatorMetric {
+ type Error = BallistaError;
+
+ fn try_into(self) -> Result<MetricValue, Self::Error> {
+ match self.metric {
+ Some(operator_metric::Metric::OutputRows(value)) => {
+ let count = Count::new();
+ count.add(value as usize);
+ Ok(MetricValue::OutputRows(count))
+ }
+ Some(operator_metric::Metric::ElapseTime(value)) => {
+ let time = Time::new();
+ time.add_duration(Duration::from_nanos(value));
+ Ok(MetricValue::ElapsedCompute(time))
+ }
+ Some(operator_metric::Metric::SpillCount(value)) => {
+ let count = Count::new();
+ count.add(value as usize);
+ Ok(MetricValue::SpillCount(count))
+ }
+ Some(operator_metric::Metric::SpilledBytes(value)) => {
+ let count = Count::new();
+ count.add(value as usize);
+ Ok(MetricValue::SpilledBytes(count))
+ }
+ Some(operator_metric::Metric::CurrentMemoryUsage(value)) => {
+ let gauge = Gauge::new();
+ gauge.add(value as usize);
+ Ok(MetricValue::CurrentMemoryUsage(gauge))
+ }
+ Some(operator_metric::Metric::Count(NamedCount { name, value }))
=> {
+ let count = Count::new();
+ count.add(value as usize);
+ Ok(MetricValue::Count {
+ name: name.into(),
+ count,
+ })
+ }
+ Some(operator_metric::Metric::Gauge(NamedGauge { name, value }))
=> {
+ let gauge = Gauge::new();
+ gauge.add(value as usize);
+ Ok(MetricValue::Gauge {
+ name: name.into(),
+ gauge,
+ })
+ }
+ Some(operator_metric::Metric::Time(NamedTime { name, value })) => {
+ let time = Time::new();
+ time.add_duration(Duration::from_nanos(value));
+ Ok(MetricValue::Time {
+ name: name.into(),
+ time,
+ })
+ }
+ Some(operator_metric::Metric::StartTimestamp(value)) => {
+ let timestamp = Timestamp::new();
+ timestamp.set(Utc.timestamp_nanos(value));
+ Ok(MetricValue::StartTimestamp(timestamp))
+ }
+ Some(operator_metric::Metric::EndTimestamp(value)) => {
+ let timestamp = Timestamp::new();
+ timestamp.set(Utc.timestamp_nanos(value));
+ Ok(MetricValue::EndTimestamp(timestamp))
+ }
+ None => Err(BallistaError::General(
+ "scheduler::from_proto(OperatorMetric) metric is
None.".to_owned(),
+ )),
+ }
+ }
+}
+
+impl TryInto<MetricsSet> for protobuf::OperatorMetricsSet {
+ type Error = BallistaError;
+
+ fn try_into(self) -> Result<MetricsSet, Self::Error> {
+ let mut ms = MetricsSet::new();
+ let metrics = self
+ .metrics
+ .into_iter()
+ .map(|m| m.try_into())
+ .collect::<Result<Vec<_>, BallistaError>>()?;
+
+ for value in metrics {
+ let new_metric = Arc::new(Metric::new(value, None));
+ ms.push(new_metric)
+ }
+ Ok(ms)
+ }
+}
diff --git a/ballista/rust/core/src/serde/scheduler/to_proto.rs
b/ballista/rust/core/src/serde/scheduler/to_proto.rs
index 3a1789f2..815bc96d 100644
--- a/ballista/rust/core/src/serde/scheduler/to_proto.rs
+++ b/ballista/rust/core/src/serde/scheduler/to_proto.rs
@@ -15,11 +15,13 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
use std::convert::TryInto;
use crate::error::BallistaError;
use crate::serde::protobuf;
use crate::serde::protobuf::action::ActionType;
+use crate::serde::protobuf::{operator_metric, NamedCount, NamedGauge,
NamedTime};
use crate::serde::scheduler::{Action, PartitionId, PartitionLocation,
PartitionStats};
use datafusion::physical_plan::Partitioning;
@@ -103,3 +105,69 @@ pub fn hash_partitioning_to_proto(
))),
}
}
+
+impl TryInto<protobuf::OperatorMetric> for &MetricValue {
+ type Error = BallistaError;
+
+ fn try_into(self) -> Result<protobuf::OperatorMetric, Self::Error> {
+ match self {
+ MetricValue::OutputRows(count) => Ok(protobuf::OperatorMetric {
+ metric: Some(operator_metric::Metric::OutputRows(count.value()
as u64)),
+ }),
+ MetricValue::ElapsedCompute(time) => Ok(protobuf::OperatorMetric {
+ metric: Some(operator_metric::Metric::ElapseTime(time.value()
as u64)),
+ }),
+ MetricValue::SpillCount(count) => Ok(protobuf::OperatorMetric {
+ metric: Some(operator_metric::Metric::SpillCount(count.value()
as u64)),
+ }),
+ MetricValue::SpilledBytes(count) => Ok(protobuf::OperatorMetric {
+ metric:
Some(operator_metric::Metric::SpilledBytes(count.value() as u64)),
+ }),
+ MetricValue::CurrentMemoryUsage(gauge) =>
Ok(protobuf::OperatorMetric {
+ metric: Some(operator_metric::Metric::CurrentMemoryUsage(
+ gauge.value() as u64
+ )),
+ }),
+ MetricValue::Count { name, count } => Ok(protobuf::OperatorMetric {
+ metric: Some(operator_metric::Metric::Count(NamedCount {
+ name: name.to_string(),
+ value: count.value() as u64,
+ })),
+ }),
+ MetricValue::Gauge { name, gauge } => Ok(protobuf::OperatorMetric {
+ metric: Some(operator_metric::Metric::Gauge(NamedGauge {
+ name: name.to_string(),
+ value: gauge.value() as u64,
+ })),
+ }),
+ MetricValue::Time { name, time } => Ok(protobuf::OperatorMetric {
+ metric: Some(operator_metric::Metric::Time(NamedTime {
+ name: name.to_string(),
+ value: time.value() as u64,
+ })),
+ }),
+ MetricValue::StartTimestamp(timestamp) =>
Ok(protobuf::OperatorMetric {
+ metric: Some(operator_metric::Metric::StartTimestamp(
+ timestamp.value().map(|m|
m.timestamp_nanos()).unwrap_or(0),
+ )),
+ }),
+ MetricValue::EndTimestamp(timestamp) =>
Ok(protobuf::OperatorMetric {
+ metric: Some(operator_metric::Metric::EndTimestamp(
+ timestamp.value().map(|m|
m.timestamp_nanos()).unwrap_or(0),
+ )),
+ }),
+ }
+ }
+}
+
+impl TryInto<protobuf::OperatorMetricsSet> for MetricsSet {
+ type Error = BallistaError;
+
+ fn try_into(self) -> Result<protobuf::OperatorMetricsSet, Self::Error> {
+ let metrics = self
+ .iter()
+ .map(|m| m.value().try_into())
+ .collect::<Result<Vec<_>, BallistaError>>()?;
+ Ok(protobuf::OperatorMetricsSet { metrics })
+ }
+}
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index d356703a..ae8d5773 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -38,6 +38,7 @@ use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::file_format::{CsvExec, ParquetExec};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_join::HashJoinExec;
+use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{metrics, ExecutionPlan, RecordBatchStream};
@@ -342,3 +343,16 @@ pub fn create_grpc_server() -> Server {
.http2_keepalive_interval(Option::Some(Duration::from_secs(300)))
.http2_keepalive_timeout(Option::Some(Duration::from_secs(20)))
}
+
+pub fn collect_plan_metrics(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
+ let mut metrics_array = Vec::<MetricsSet>::new();
+ if let Some(metrics) = plan.metrics() {
+ metrics_array.push(metrics);
+ }
+ plan.children().iter().for_each(|c| {
+ collect_plan_metrics(c.as_ref())
+ .into_iter()
+ .for_each(|e| metrics_array.push(e))
+ });
+ metrics_array
+}
diff --git a/ballista/rust/executor/src/execution_loop.rs
b/ballista/rust/executor/src/execution_loop.rs
index c9408124..471377e4 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -28,12 +28,14 @@ use ballista_core::error::BallistaError;
use
ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+use ballista_core::utils::collect_plan_metrics;
use datafusion::execution::context::TaskContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
use futures::FutureExt;
use log::{debug, error, info, trace, warn};
use std::any::Any;
use std::collections::HashMap;
+use std::convert::TryInto;
use std::error::Error;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -183,14 +185,18 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan,
U: 'static + AsExecution
plan.schema().as_ref(),
)?;
+ let shuffle_writer_plan = executor.new_shuffle_writer(
+ task_id.job_id.clone(),
+ task_id.stage_id as usize,
+ plan,
+ )?;
tokio::spawn(async move {
use std::panic::AssertUnwindSafe;
-
let execution_result = match
AssertUnwindSafe(executor.execute_shuffle_write(
task_id.job_id.clone(),
task_id.stage_id as usize,
task_id.partition_id as usize,
- plan,
+ shuffle_writer_plan.clone(),
task_context,
shuffle_output_partitioning,
))
@@ -209,10 +215,18 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan,
U: 'static + AsExecution
debug!("Statistics: {:?}", execution_result);
available_tasks_slots.fetch_add(1, Ordering::SeqCst);
+ let plan_metrics = collect_plan_metrics(shuffle_writer_plan.as_ref());
+ let operator_metrics = plan_metrics
+ .into_iter()
+ .map(|m| m.try_into())
+ .collect::<Result<Vec<_>, BallistaError>>()
+ .ok();
+
let _ = task_status_sender.send(as_task_status(
execution_result,
executor.metadata.id.clone(),
task_id,
+ operator_metrics,
));
});
diff --git a/ballista/rust/executor/src/executor.rs
b/ballista/rust/executor/src/executor.rs
index ad643ba1..34af3427 100644
--- a/ballista/rust/executor/src/executor.rs
+++ b/ballista/rust/executor/src/executor.rs
@@ -88,16 +88,30 @@ impl Executor {
job_id: String,
stage_id: usize,
part: usize,
- plan: Arc<dyn ExecutionPlan>,
+ shuffle_writer: Arc<ShuffleWriterExec>,
task_ctx: Arc<TaskContext>,
_shuffle_output_partitioning: Option<Partitioning>,
) -> Result<Vec<protobuf::ShuffleWritePartition>, BallistaError> {
+ let partitions = shuffle_writer.execute_shuffle_write(part,
task_ctx).await?;
+ self.metrics_collector
+ .record_stage(&job_id, stage_id, part, shuffle_writer);
+
+ Ok(partitions)
+ }
+
+ /// Recreate the shuffle writer with the correct working directory.
+ pub fn new_shuffle_writer(
+ &self,
+ job_id: String,
+ stage_id: usize,
+ plan: Arc<dyn ExecutionPlan>,
+ ) -> Result<Arc<ShuffleWriterExec>, BallistaError> {
let exec = if let Some(shuffle_writer) =
plan.as_any().downcast_ref::<ShuffleWriterExec>()
{
// recreate the shuffle writer with the correct working directory
ShuffleWriterExec::try_new(
- job_id.clone(),
+ job_id,
stage_id,
plan.children()[0].clone(),
self.work_dir.clone(),
@@ -109,13 +123,7 @@ impl Executor {
.to_string(),
))
}?;
-
- let partitions = exec.execute_shuffle_write(part, task_ctx).await?;
-
- self.metrics_collector
- .record_stage(&job_id, stage_id, part, exec);
-
- Ok(partitions)
+ Ok(Arc::new(exec))
}
pub fn work_dir(&self) -> &str {
diff --git a/ballista/rust/executor/src/executor_server.rs
b/ballista/rust/executor/src/executor_server.rs
index e484e05a..591c88a1 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -16,6 +16,7 @@
// under the License.
use std::collections::HashMap;
+use std::convert::TryInto;
use std::ops::Deref;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -39,7 +40,7 @@ use ballista_core::serde::protobuf::{
};
use ballista_core::serde::scheduler::ExecutorState;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
-use ballista_core::utils::create_grpc_server;
+use ballista_core::utils::{collect_plan_metrics, create_grpc_server};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
@@ -233,13 +234,19 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorServer<T,
plan.schema().as_ref(),
)?;
+ let shuffle_writer_plan = self.executor.new_shuffle_writer(
+ task_id.job_id.clone(),
+ task_id.stage_id as usize,
+ plan,
+ )?;
+
let execution_result = self
.executor
.execute_shuffle_write(
task_id.job_id.clone(),
task_id.stage_id as usize,
task_id.partition_id as usize,
- plan,
+ shuffle_writer_plan.clone(),
task_context,
shuffle_output_partitioning,
)
@@ -247,8 +254,18 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorServer<T,
info!("Done with task {}", task_id_log);
debug!("Statistics: {:?}", execution_result);
+ let plan_metrics = collect_plan_metrics(shuffle_writer_plan.as_ref());
+ let operator_metrics = plan_metrics
+ .into_iter()
+ .map(|m| m.try_into())
+ .collect::<Result<Vec<_>, BallistaError>>()?;
let executor_id = &self.executor.metadata.id;
- let task_status = as_task_status(execution_result,
executor_id.clone(), task_id);
+ let task_status = as_task_status(
+ execution_result,
+ executor_id.clone(),
+ task_id,
+ Some(operator_metrics),
+ );
let task_status_sender = self.executor_env.tx_task_status.clone();
task_status_sender.send(task_status).await.unwrap();
diff --git a/ballista/rust/executor/src/lib.rs
b/ballista/rust/executor/src/lib.rs
index 4d145b26..e93993b6 100644
--- a/ballista/rust/executor/src/lib.rs
+++ b/ballista/rust/executor/src/lib.rs
@@ -32,21 +32,27 @@ pub use standalone::new_standalone_executor;
use log::info;
use ballista_core::serde::protobuf::{
- task_status, CompletedTask, FailedTask, PartitionId, ShuffleWritePartition,
- TaskStatus,
+ task_status, CompletedTask, FailedTask, OperatorMetricsSet, PartitionId,
+ ShuffleWritePartition, TaskStatus,
};
pub fn as_task_status(
execution_result: ballista_core::error::Result<Vec<ShuffleWritePartition>>,
executor_id: String,
task_id: PartitionId,
+ operator_metrics: Option<Vec<OperatorMetricsSet>>,
) -> TaskStatus {
+ let metrics = operator_metrics.unwrap_or_default();
match execution_result {
Ok(partitions) => {
- info!("Task {:?} finished", task_id);
-
+ info!(
+ "Task {:?} finished with operator_metrics array size {}",
+ task_id,
+ metrics.len()
+ );
TaskStatus {
task_id: Some(task_id),
+ metrics,
status: Some(task_status::Status::Completed(CompletedTask {
executor_id,
partitions,
@@ -59,6 +65,7 @@ pub fn as_task_status(
TaskStatus {
task_id: Some(task_id),
+ metrics,
status: Some(task_status::Status::Failed(FailedTask {
error: format!("Task failed due to Tokio error: {}",
error_msg),
})),
diff --git a/ballista/rust/executor/src/metrics/mod.rs
b/ballista/rust/executor/src/metrics/mod.rs
index 2c7e1d50..9a735c50 100644
--- a/ballista/rust/executor/src/metrics/mod.rs
+++ b/ballista/rust/executor/src/metrics/mod.rs
@@ -17,6 +17,7 @@
use ballista_core::execution_plans::ShuffleWriterExec;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use std::sync::Arc;
/// `ExecutorMetricsCollector` records metrics for `ShuffleWriteExec`
/// after they are executed.
@@ -30,7 +31,7 @@ pub trait ExecutorMetricsCollector: Send + Sync {
job_id: &str,
stage_id: usize,
partition: usize,
- plan: ShuffleWriterExec,
+ plan: Arc<ShuffleWriterExec>,
);
}
@@ -45,14 +46,14 @@ impl ExecutorMetricsCollector for LoggingMetricsCollector {
job_id: &str,
stage_id: usize,
partition: usize,
- plan: ShuffleWriterExec,
+ plan: Arc<ShuffleWriterExec>,
) {
println!(
"=== [{}/{}/{}] Physical plan with metrics ===\n{}\n",
job_id,
stage_id,
partition,
- DisplayableExecutionPlan::with_metrics(&plan).indent()
+ DisplayableExecutionPlan::with_metrics(plan.as_ref()).indent()
);
}
}
diff --git a/ballista/rust/scheduler/src/display.rs
b/ballista/rust/scheduler/src/display.rs
new file mode 100644
index 00000000..e4557a6b
--- /dev/null
+++ b/ballista/rust/scheduler/src/display.rs
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Implementation of ballista physical plan display with metrics. See
+//! [`crate::physical_plan::displayable`] for examples of how to
+//! format
+
+use datafusion::logical_plan::{StringifiedPlan, ToStringifiedPlan};
+use datafusion::physical_plan::metrics::MetricsSet;
+use datafusion::physical_plan::{
+ accept, DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor,
+};
+use std::fmt;
+
+/// Wraps an `ExecutionPlan` to display this plan with metrics
collected/aggregated.
+/// The metrics must be collected in the same order as how we visit and
display the plan.
+pub struct DisplayableBallistaExecutionPlan<'a> {
+ inner: &'a dyn ExecutionPlan,
+ metrics: &'a Vec<MetricsSet>,
+}
+
+impl<'a> DisplayableBallistaExecutionPlan<'a> {
+ /// Create a wrapper around an [`'ExecutionPlan'] which can be
+ /// pretty printed with aggregated metrics.
+ pub fn new(inner: &'a dyn ExecutionPlan, metrics: &'a Vec<MetricsSet>) ->
Self {
+ Self { inner, metrics }
+ }
+
+ /// Return a `format`able structure that produces a single line
+ /// per node.
+ ///
+ /// ```text
+ /// ProjectionExec: expr=[a]
+ /// CoalesceBatchesExec: target_batch_size=4096
+ /// FilterExec: a < 5
+ /// RepartitionExec: partitioning=RoundRobinBatch(16)
+ /// CsvExec: source=...",
+ /// ```
+ pub fn indent(&self) -> impl fmt::Display + 'a {
+ struct Wrapper<'a> {
+ plan: &'a dyn ExecutionPlan,
+ metrics: &'a Vec<MetricsSet>,
+ }
+ impl<'a> fmt::Display for Wrapper<'a> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ let t = DisplayFormatType::Default;
+ let mut visitor = IndentVisitor {
+ t,
+ f,
+ indent: 0,
+ metrics: self.metrics,
+ metric_index: 0,
+ };
+ accept(self.plan, &mut visitor)
+ }
+ }
+ Wrapper {
+ plan: self.inner,
+ metrics: self.metrics,
+ }
+ }
+}
+
+/// Formats plans with a single line per node.
+struct IndentVisitor<'a, 'b> {
+ /// How to format each node
+ t: DisplayFormatType,
+ /// Write to this formatter
+ f: &'a mut fmt::Formatter<'b>,
+ /// Indent size
+ indent: usize,
+ /// The metrics along with the plan
+ metrics: &'a Vec<MetricsSet>,
+ /// The metric index
+ metric_index: usize,
+}
+
+impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
+ type Error = fmt::Error;
+ fn pre_visit(
+ &mut self,
+ plan: &dyn ExecutionPlan,
+ ) -> std::result::Result<bool, Self::Error> {
+ write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
+ plan.fmt_as(self.t, self.f)?;
+ if let Some(metrics) = self.metrics.get(self.metric_index) {
+ let metrics = metrics
+ .aggregate_by_partition()
+ .sorted_for_display()
+ .timestamps_removed();
+ write!(self.f, ", metrics=[{}]", metrics)?;
+ } else {
+ write!(self.f, ", metrics=[]")?;
+ }
+ writeln!(self.f)?;
+ self.indent += 1;
+ self.metric_index += 1;
+ Ok(true)
+ }
+
+ fn post_visit(&mut self, _plan: &dyn ExecutionPlan) -> Result<bool,
Self::Error> {
+ self.indent -= 1;
+ Ok(true)
+ }
+}
+
+impl<'a> ToStringifiedPlan for DisplayableBallistaExecutionPlan<'a> {
+ fn to_stringified(
+ &self,
+ plan_type: datafusion::logical_plan::PlanType,
+ ) -> StringifiedPlan {
+ StringifiedPlan::new(plan_type, self.indent().to_string())
+ }
+}
diff --git a/ballista/rust/scheduler/src/lib.rs
b/ballista/rust/scheduler/src/lib.rs
index 2c2fa410..838eb562 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -18,6 +18,7 @@
#![doc = include_str ! ("../README.md")]
pub mod api;
+pub mod display;
pub mod planner;
pub mod scheduler_server;
#[cfg(feature = "sled")]
diff --git a/ballista/rust/scheduler/src/planner.rs
b/ballista/rust/scheduler/src/planner.rs
index 1c946937..9c393bac 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -63,7 +63,7 @@ impl DistributedPlanner {
job_id: &'a str,
execution_plan: Arc<dyn ExecutionPlan>,
) -> Result<Vec<Arc<ShuffleWriterExec>>> {
- info!("planning query stages");
+ info!("planning query stages for job {}", job_id);
let (new_plan, mut stages) =
self.plan_query_stages_internal(job_id, execution_plan)?;
stages.push(create_shuffle_writer(
diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
index b0037ab0..d6397ba6 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
@@ -322,6 +322,7 @@ mod test {
stage_id: 1,
partition_id: 0,
}),
+ metrics: vec![],
status: Some(task_status::Status::Completed(CompletedTask {
executor_id: "executor-1".to_string(),
partitions,
diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs
b/ballista/rust/scheduler/src/scheduler_server/mod.rs
index a6a26d80..2fac229b 100644
--- a/ballista/rust/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs
@@ -364,6 +364,7 @@ mod test {
executor_id: "executor-1".to_owned(),
partitions,
})),
+ metrics: vec![],
task_id: Some(PartitionId {
job_id: job_id.to_owned(),
stage_id: task.partition.stage_id as u32,
@@ -493,6 +494,7 @@ mod test {
partitions,
},
)),
+ metrics: vec![],
task_id: Some(PartitionId {
job_id: job_id.to_owned(),
stage_id: task.partition.stage_id as
u32,
@@ -627,6 +629,7 @@ mod test {
error: "".to_string(),
},
)),
+ metrics: vec![],
task_id: Some(PartitionId {
job_id: job_id.to_owned(),
stage_id: task.partition.stage_id as
u32,
diff --git a/ballista/rust/scheduler/src/state/execution_graph.rs
b/ballista/rust/scheduler/src/state/execution_graph.rs
index 1412f7e0..b13f7c58 100644
--- a/ballista/rust/scheduler/src/state/execution_graph.rs
+++ b/ballista/rust/scheduler/src/state/execution_graph.rs
@@ -15,12 +15,13 @@
// specific language governing permissions and limitations
// under the License.
+use crate::display::DisplayableBallistaExecutionPlan;
use crate::planner::DistributedPlanner;
use ballista_core::error::{BallistaError, Result};
use ballista_core::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
use ballista_core::serde::protobuf::{
- self, CompletedJob, JobStatus, QueuedJob, TaskStatus,
+ self, CompletedJob, JobStatus, OperatorMetricsSet, QueuedJob, TaskStatus,
};
use ballista_core::serde::protobuf::{job_status, FailedJob,
ShuffleWritePartition};
use ballista_core::serde::protobuf::{task_status, RunningTask};
@@ -28,14 +29,16 @@ use ballista_core::serde::scheduler::{
ExecutorMetadata, PartitionId, PartitionLocation, PartitionStats,
};
use datafusion::physical_plan::{
- accept, ExecutionPlan, ExecutionPlanVisitor, Partitioning,
+ accept, ExecutionPlan, ExecutionPlanVisitor, Metric, Partitioning,
};
-use log::debug;
+use log::{debug, info};
use std::collections::HashMap;
use std::convert::TryInto;
use std::fmt::{Debug, Formatter};
+use ballista_core::utils::collect_plan_metrics;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
use std::sync::Arc;
/// This data structure collects the partition locations for an
`ExecutionStage`.
@@ -103,6 +106,8 @@ pub struct ExecutionStage {
/// Flag indicating whether all input partitions have been resolved and
the plan
/// has UnresovledShuffleExec operators resolved to ShuffleReadExec
operators.
pub(crate) resolved: bool,
+ /// Combined metrics of the already finished tasks in the stage, If it is
None, no task is finished yet.
+ pub(crate) stage_metrics: Option<Vec<MetricsSet>>,
}
impl Debug for ExecutionStage {
@@ -153,6 +158,7 @@ impl ExecutionStage {
task_statuses: vec![None; num_tasks],
output_link,
resolved,
+ stage_metrics: None,
}
}
@@ -230,6 +236,60 @@ impl ExecutionStage {
self.task_statuses[partition] = Some(status);
}
+ /// update and combine the task metrics to the stage metrics
+ pub fn update_task_metrics(
+ &mut self,
+ partition: usize,
+ metrics: Vec<OperatorMetricsSet>,
+ ) -> Result<()> {
+ if let Some(combined_metrics) = &mut self.stage_metrics {
+ if metrics.len() != combined_metrics.len() {
+ return Err(BallistaError::Internal(format!("Error updating
task metrics to stage {}, task metrics array size {} does not equal \
+ with the stage metrics array size {} for task {}",
self.stage_id, metrics.len(), combined_metrics.len(), partition)));
+ }
+ let metrics_values_array = metrics
+ .into_iter()
+ .map(|ms| {
+ ms.metrics
+ .into_iter()
+ .map(|m| m.try_into())
+ .collect::<Result<Vec<_>>>()
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let new_metrics_set = combined_metrics
+ .iter_mut()
+ .zip(metrics_values_array)
+ .map(|(first, second)| {
+ Self::combine_metrics_set(first, second, partition)
+ })
+ .collect();
+ self.stage_metrics = Some(new_metrics_set)
+ } else {
+ let new_metrics_set = metrics
+ .into_iter()
+ .map(|ms| ms.try_into())
+ .collect::<Result<Vec<_>>>()?;
+ if !new_metrics_set.is_empty() {
+ self.stage_metrics = Some(new_metrics_set)
+ }
+ }
+ Ok(())
+ }
+
+ pub fn combine_metrics_set(
+ first: &mut MetricsSet,
+ second: Vec<MetricValue>,
+ partition: usize,
+ ) -> MetricsSet {
+ for metric_value in second {
+ // TODO recheck the lable logic
+ let new_metric = Arc::new(Metric::new(metric_value,
Some(partition)));
+ first.push(new_metric);
+ }
+ first.aggregate_by_partition()
+ }
+
/// Add input partitions published from an input stage.
pub fn add_input_partitions(
&mut self,
@@ -467,8 +527,8 @@ impl ExecutionGraph {
self.stages.values().all(|s| s.complete())
}
- /// Update task statuses in the graph. This will push shuffle partitions
to their
- /// respective shuffle read stages.
+ /// Update task statuses and task metrics in the graph.
+ /// This will also push shuffle partitions to their respective shuffle
read stages.
pub fn update_task_status(
&mut self,
executor: &ExecutorMetadata,
@@ -482,6 +542,7 @@ impl ExecutionGraph {
stage_id,
partition_id,
}),
+ metrics: operator_metrics,
status: Some(task_status),
} = status
{
@@ -497,6 +558,7 @@ impl ExecutionGraph {
let partition = partition_id as usize;
if let Some(stage) = self.stages.get_mut(&stage_id) {
stage.update_task_status(partition, task_status.clone());
+ let stage_plan = stage.plan.clone();
let stage_complete = stage.complete();
// TODO Should be able to reschedule this task.
@@ -513,6 +575,40 @@ impl ExecutionGraph {
} else if let
task_status::Status::Completed(completed_task) =
task_status
{
+ // update task metrics for completed task
+ stage.update_task_metrics(partition,
operator_metrics)?;
+
+ // if this stage is completed, we want to combine the
stage metrics to plan's metric set and print out the plan
+ if stage_complete &&
stage.stage_metrics.as_ref().is_some() {
+ // The plan_metrics collected here is a snapshot
clone from the plan metrics.
+ // They are all empty now and need to combine with
the stage metrics in the ExecutionStages
+ let mut plan_metrics =
+ collect_plan_metrics(stage_plan.as_ref());
+ let stage_metrics = stage
+ .stage_metrics
+ .as_ref()
+ .expect("stage metrics should not be None.");
+ if plan_metrics.len() != stage_metrics.len() {
+ return
Err(BallistaError::Internal(format!("Error combine stage metrics to plan for
stage {}, plan metrics array size {} does not equal \
+ to the stage metrics array size {}", stage_id,
plan_metrics.len(), stage_metrics.len())));
+ }
+
plan_metrics.iter_mut().zip(stage_metrics).for_each(
+ |(plan_metric, stage_metric)| {
+ stage_metric
+ .iter()
+ .for_each(|s|
plan_metric.push(s.clone()));
+ },
+ );
+
+ info!(
+ "=== [{}/{}/{}] Stage finished, physical plan
with metrics ===\n{}\n",
+ job_id,
+ stage_id,
+ partition,
+
DisplayableBallistaExecutionPlan::new(stage_plan.as_ref(),
plan_metrics.as_ref()).indent()
+ );
+ }
+
let locations = partition_to_location(
self.job_id.as_str(),
stage_id,
@@ -808,6 +904,7 @@ mod test {
executor_id: "executor-1".to_owned(),
partitions,
})),
+ metrics: vec![],
task_id: Some(protobuf::PartitionId {
job_id: job_id.clone(),
stage_id: task.partition.stage_id as u32,
diff --git a/ballista/rust/scheduler/src/state/task_manager.rs
b/ballista/rust/scheduler/src/state/task_manager.rs
index a5bc049a..cc329263 100644
--- a/ballista/rust/scheduler/src/state/task_manager.rs
+++ b/ballista/rust/scheduler/src/state/task_manager.rs
@@ -590,6 +590,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
},
);
}
+ let stage_metrics = if stage.stage_metrics.is_empty() {
+ None
+ } else {
+ let ms = stage
+ .stage_metrics
+ .into_iter()
+ .map(|m| m.try_into())
+ .collect::<Result<Vec<_>>>()?;
+ Some(ms)
+ };
let execution_stage = ExecutionStage {
stage_id: stage.stage_id as usize,
@@ -600,6 +610,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
task_statuses,
output_link,
resolved: stage.resolved,
+ stage_metrics,
};
stages.insert(stage_id, execution_stage);
}
@@ -681,6 +692,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
stage_id: stage_id as u32,
partition_id: partition as u32,
}),
+ // task metrics should not persist.
+ metrics: vec![],
status: Some(status),
})
})
@@ -689,6 +702,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
let output_partitioning =
hash_partitioning_to_proto(stage.output_partitioning.as_ref())?;
+ let stage_metrics = stage
+ .stage_metrics
+ .unwrap_or_default()
+ .into_iter()
+ .map(|m| m.try_into())
+ .collect::<Result<Vec<_>>>()?;
Ok(protobuf::ExecutionGraphStage {
stage_id: stage_id as u64,
partitions: stage.partitions as u32,
@@ -698,6 +717,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
task_statuses,
output_link,
resolved: stage.resolved,
+ stage_metrics,
})
})
.collect::<Result<Vec<_>>>()?;