This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 2202ba2a feat: improve executor logs (#1187)
2202ba2a is described below
commit 2202ba2a9b26c3d9aff032127ff03e8cde20a75c
Author: Marko Milenković <[email protected]>
AuthorDate: Wed Mar 5 15:38:22 2025 +0000
feat: improve executor logs (#1187)
* feat: improve executor loggers
* add metrics to logs
---
.../core/src/execution_plans/shuffle_reader.rs | 6 +++---
.../core/src/execution_plans/shuffle_writer.rs | 19 ++++++++++++++++--
ballista/core/src/extension.rs | 7 +++++++
ballista/executor/src/execution_engine.rs | 23 ++++++++++++++++++++--
ballista/executor/src/metrics/mod.rs | 2 +-
5 files changed, 49 insertions(+), 8 deletions(-)
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs
b/ballista/core/src/execution_plans/shuffle_reader.rs
index f8259364..f52c24f1 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -49,7 +49,7 @@ use crate::error::BallistaError;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use itertools::Itertools;
-use log::{error, info};
+use log::{debug, error};
use rand::prelude::SliceRandom;
use rand::thread_rng;
use tokio::sync::{mpsc, Semaphore};
@@ -145,7 +145,7 @@ impl ExecutionPlan for ShuffleReaderExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let task_id = context.task_id().unwrap_or_else(||
partition.to_string());
- info!("ShuffleReaderExec::execute({})", task_id);
+ debug!("ShuffleReaderExec::execute({})", task_id);
let config = context.session_config();
@@ -304,7 +304,7 @@ fn send_fetch_partitions(
.into_iter()
.partition(check_is_local_location);
- info!(
+ debug!(
"local shuffle file counts:{}, remote shuffle file count:{}.",
local_locations.len(),
remote_locations.len()
diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs
b/ballista/core/src/execution_plans/shuffle_writer.rs
index 23b437f6..b6f2dcbe 100644
--- a/ballista/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/core/src/execution_plans/shuffle_writer.rs
@@ -25,6 +25,7 @@ use datafusion::arrow::ipc::CompressionType;
use datafusion::arrow::ipc::writer::StreamWriter;
use std::any::Any;
+use std::fmt::Debug;
use std::fs;
use std::fs::File;
use std::future::Future;
@@ -50,8 +51,8 @@ use datafusion::physical_plan::metrics::{
};
use datafusion::physical_plan::{
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
- SendableRecordBatchStream, Statistics,
+ displayable, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
+ PlanProperties, SendableRecordBatchStream, Statistics,
};
use futures::{StreamExt, TryFutureExt, TryStreamExt};
@@ -80,9 +81,23 @@ pub struct ShuffleWriterExec {
shuffle_output_partitioning: Option<Partitioning>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
+ /// Plan properties
properties: PlanProperties,
}
+impl std::fmt::Display for ShuffleWriterExec {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ let printable_plan = displayable(self.plan.as_ref())
+ .set_show_statistics(true)
+ .indent(false);
+ write!(
+ f,
+ "ShuffleWriterExec: job={} stage={} work_dir={} partitioning={:?}
plan: \n {}",
+ self.job_id, self.stage_id, self.work_dir,
self.shuffle_output_partitioning, printable_plan
+ )
+ }
+}
+
pub struct WriteTracker {
pub num_batches: usize,
pub num_rows: usize,
diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs
index 4ff1af98..4a25ee12 100644
--- a/ballista/core/src/extension.rs
+++ b/ballista/core/src/extension.rs
@@ -315,6 +315,13 @@ impl SessionConfigHelperExt for SessionConfig {
self.options()
.entries()
.iter()
+ // TODO: revisit this log once we this option is removed
+ //
+ // filtering this key as it's creating a lot of warning logs
+ // at the executor side.
+ .filter(|c| {
+ c.key !=
"datafusion.sql_parser.enable_options_value_normalization"
+ })
.map(|datafusion::config::ConfigEntry { key, value, .. }| {
log::trace!("sending configuration key: `{}`, value`{:?}`",
key, value);
KeyValuePair {
diff --git a/ballista/executor/src/execution_engine.rs
b/ballista/executor/src/execution_engine.rs
index 42084267..578f3f35 100644
--- a/ballista/executor/src/execution_engine.rs
+++ b/ballista/executor/src/execution_engine.rs
@@ -23,7 +23,7 @@ use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::ExecutionPlan;
-use std::fmt::Debug;
+use std::fmt::{Debug, Display};
use std::sync::Arc;
/// Execution engine extension point
@@ -42,7 +42,7 @@ pub trait ExecutionEngine: Sync + Send {
/// partition is re-partitioned and streamed to disk in Arrow IPC format.
Future stages of the query
/// will use the ShuffleReaderExec to read these results.
#[async_trait]
-pub trait QueryStageExecutor: Sync + Send + Debug {
+pub trait QueryStageExecutor: Sync + Send + Debug + Display {
async fn execute_query_stage(
&self,
input_partition: usize,
@@ -95,6 +95,25 @@ impl DefaultQueryStageExec {
}
}
+impl Display for DefaultQueryStageExec {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let stage_metrics: Vec<String> = self
+ .shuffle_writer
+ .metrics()
+ .unwrap_or_default()
+ .iter()
+ .map(|m| m.to_string())
+ .collect();
+
+ write!(
+ f,
+ "DefaultQueryStageExec: ({})\n{}",
+ stage_metrics.join(", "),
+ self.shuffle_writer
+ )
+ }
+}
+
#[async_trait]
impl QueryStageExecutor for DefaultQueryStageExec {
async fn execute_query_stage(
diff --git a/ballista/executor/src/metrics/mod.rs
b/ballista/executor/src/metrics/mod.rs
index 9a0f58fa..10fb0ef0 100644
--- a/ballista/executor/src/metrics/mod.rs
+++ b/ballista/executor/src/metrics/mod.rs
@@ -49,7 +49,7 @@ impl ExecutorMetricsCollector for LoggingMetricsCollector {
plan: Arc<dyn QueryStageExecutor>,
) {
info!(
- "=== [{}/{}/{}] Physical plan with metrics ===\n{:?}\n",
+ "=== [{}/{}/{}] Physical plan with metrics ===\n{}\n",
job_id, stage_id, partition, plan
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]