This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 2202ba2a feat: improve executor logs (#1187)
2202ba2a is described below

commit 2202ba2a9b26c3d9aff032127ff03e8cde20a75c
Author: Marko Milenković <[email protected]>
AuthorDate: Wed Mar 5 15:38:22 2025 +0000

    feat: improve executor logs (#1187)
    
    * feat: improve executor loggers
    
    * add metrics to logs
---
 .../core/src/execution_plans/shuffle_reader.rs     |  6 +++---
 .../core/src/execution_plans/shuffle_writer.rs     | 19 ++++++++++++++++--
 ballista/core/src/extension.rs                     |  7 +++++++
 ballista/executor/src/execution_engine.rs          | 23 ++++++++++++++++++++--
 ballista/executor/src/metrics/mod.rs               |  2 +-
 5 files changed, 49 insertions(+), 8 deletions(-)

diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs 
b/ballista/core/src/execution_plans/shuffle_reader.rs
index f8259364..f52c24f1 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -49,7 +49,7 @@ use crate::error::BallistaError;
 use datafusion::execution::context::TaskContext;
 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
 use itertools::Itertools;
-use log::{error, info};
+use log::{debug, error};
 use rand::prelude::SliceRandom;
 use rand::thread_rng;
 use tokio::sync::{mpsc, Semaphore};
@@ -145,7 +145,7 @@ impl ExecutionPlan for ShuffleReaderExec {
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         let task_id = context.task_id().unwrap_or_else(|| 
partition.to_string());
-        info!("ShuffleReaderExec::execute({})", task_id);
+        debug!("ShuffleReaderExec::execute({})", task_id);
 
         let config = context.session_config();
 
@@ -304,7 +304,7 @@ fn send_fetch_partitions(
         .into_iter()
         .partition(check_is_local_location);
 
-    info!(
+    debug!(
         "local shuffle file counts:{}, remote shuffle file count:{}.",
         local_locations.len(),
         remote_locations.len()
diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs 
b/ballista/core/src/execution_plans/shuffle_writer.rs
index 23b437f6..b6f2dcbe 100644
--- a/ballista/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/core/src/execution_plans/shuffle_writer.rs
@@ -25,6 +25,7 @@ use datafusion::arrow::ipc::CompressionType;
 
 use datafusion::arrow::ipc::writer::StreamWriter;
 use std::any::Any;
+use std::fmt::Debug;
 use std::fs;
 use std::fs::File;
 use std::future::Future;
@@ -50,8 +51,8 @@ use datafusion::physical_plan::metrics::{
 };
 
 use datafusion::physical_plan::{
-    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
-    SendableRecordBatchStream, Statistics,
+    displayable, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
+    PlanProperties, SendableRecordBatchStream, Statistics,
 };
 use futures::{StreamExt, TryFutureExt, TryStreamExt};
 
@@ -80,9 +81,23 @@ pub struct ShuffleWriterExec {
     shuffle_output_partitioning: Option<Partitioning>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
+    /// Plan properties
     properties: PlanProperties,
 }
 
+impl std::fmt::Display for ShuffleWriterExec {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        let printable_plan = displayable(self.plan.as_ref())
+            .set_show_statistics(true)
+            .indent(false);
+        write!(
+            f,
+            "ShuffleWriterExec: job={} stage={} work_dir={} partitioning={:?} 
plan: \n {}",
+            self.job_id, self.stage_id, self.work_dir, 
self.shuffle_output_partitioning, printable_plan
+        )
+    }
+}
+
 pub struct WriteTracker {
     pub num_batches: usize,
     pub num_rows: usize,
diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs
index 4ff1af98..4a25ee12 100644
--- a/ballista/core/src/extension.rs
+++ b/ballista/core/src/extension.rs
@@ -315,6 +315,13 @@ impl SessionConfigHelperExt for SessionConfig {
         self.options()
             .entries()
             .iter()
+            // TODO: revisit this log once we this option is removed
+            //
+            // filtering this key as it's creating a lot of warning logs
+            // at the executor side.
+            .filter(|c| {
+                c.key != 
"datafusion.sql_parser.enable_options_value_normalization"
+            })
             .map(|datafusion::config::ConfigEntry { key, value, .. }| {
                 log::trace!("sending configuration key: `{}`, value`{:?}`", 
key, value);
                 KeyValuePair {
diff --git a/ballista/executor/src/execution_engine.rs 
b/ballista/executor/src/execution_engine.rs
index 42084267..578f3f35 100644
--- a/ballista/executor/src/execution_engine.rs
+++ b/ballista/executor/src/execution_engine.rs
@@ -23,7 +23,7 @@ use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::TaskContext;
 use datafusion::physical_plan::metrics::MetricsSet;
 use datafusion::physical_plan::ExecutionPlan;
-use std::fmt::Debug;
+use std::fmt::{Debug, Display};
 use std::sync::Arc;
 
 /// Execution engine extension point
@@ -42,7 +42,7 @@ pub trait ExecutionEngine: Sync + Send {
 /// partition is re-partitioned and streamed to disk in Arrow IPC format. 
Future stages of the query
 /// will use the ShuffleReaderExec to read these results.
 #[async_trait]
-pub trait QueryStageExecutor: Sync + Send + Debug {
+pub trait QueryStageExecutor: Sync + Send + Debug + Display {
     async fn execute_query_stage(
         &self,
         input_partition: usize,
@@ -95,6 +95,25 @@ impl DefaultQueryStageExec {
     }
 }
 
+impl Display for DefaultQueryStageExec {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let stage_metrics: Vec<String> = self
+            .shuffle_writer
+            .metrics()
+            .unwrap_or_default()
+            .iter()
+            .map(|m| m.to_string())
+            .collect();
+
+        write!(
+            f,
+            "DefaultQueryStageExec: ({})\n{}",
+            stage_metrics.join(", "),
+            self.shuffle_writer
+        )
+    }
+}
+
 #[async_trait]
 impl QueryStageExecutor for DefaultQueryStageExec {
     async fn execute_query_stage(
diff --git a/ballista/executor/src/metrics/mod.rs 
b/ballista/executor/src/metrics/mod.rs
index 9a0f58fa..10fb0ef0 100644
--- a/ballista/executor/src/metrics/mod.rs
+++ b/ballista/executor/src/metrics/mod.rs
@@ -49,7 +49,7 @@ impl ExecutorMetricsCollector for LoggingMetricsCollector {
         plan: Arc<dyn QueryStageExecutor>,
     ) {
         info!(
-            "=== [{}/{}/{}] Physical plan with metrics ===\n{:?}\n",
+            "=== [{}/{}/{}] Physical plan with metrics ===\n{}\n",
             job_id, stage_id, partition, plan
         );
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to