This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 7cc56ab7 feat: update rust edition to 2024 (#1355)
7cc56ab7 is described below

commit 7cc56ab7d8dbbbc974330c48cc91ce3a8c0d58e4
Author: jgrim <[email protected]>
AuthorDate: Fri Jan 2 13:11:22 2026 +0100

    feat: update rust edition to 2024 (#1355)
    
    * feat: update rust edition to 2024
    
    * fix: correct msrvcheck
    
    * migrate resolver to v3 (2024 edition default)
---
 Cargo.toml                                         |   4 +-
 ballista-cli/src/command.rs                        |   2 +-
 ballista-cli/src/exec.rs                           |   4 +-
 ballista-cli/src/main.rs                           |   2 +-
 ballista/client/tests/bugs.rs                      |  16 +-
 ballista/client/tests/common/mod.rs                |   2 +-
 ballista/client/tests/context_setup.rs             |  12 +-
 ballista/core/src/client.rs                        |   6 +-
 ballista/core/src/consistent_hash/mod.rs           |  18 ++-
 ballista/core/src/diagram.rs                       |   4 +-
 ballista/core/src/event_loop.rs                    |   2 +-
 .../core/src/execution_plans/distributed_query.rs  |   8 +-
 .../core/src/execution_plans/shuffle_reader.rs     |  23 +--
 .../core/src/execution_plans/shuffle_writer.rs     |   6 +-
 ballista/core/src/extension.rs                     |  11 +-
 ballista/core/src/object_store.rs                  |  11 +-
 ballista/core/src/planner.rs                       |   4 +-
 ballista/core/src/serde/mod.rs                     |   8 +-
 ballista/core/src/serde/scheduler/from_proto.rs    |  34 ++---
 ballista/core/src/serde/scheduler/to_proto.rs      |   2 +-
 ballista/core/src/utils.rs                         |   4 +-
 ballista/executor/src/bin/main.rs                  |   2 +-
 ballista/executor/src/collect.rs                   |   2 +-
 ballista/executor/src/execution_engine.rs          |   2 +-
 ballista/executor/src/execution_loop.rs            |  12 +-
 ballista/executor/src/executor.rs                  |   6 +-
 ballista/executor/src/executor_process.rs          |  24 +--
 ballista/executor/src/executor_server.rs           |  22 +--
 ballista/executor/src/flight_service.rs            |   6 +-
 ballista/executor/src/lib.rs                       |   4 +-
 ballista/executor/src/standalone.rs                |   8 +-
 ballista/scheduler/src/api/handlers.rs             |   8 +-
 ballista/scheduler/src/api/mod.rs                  |   2 +-
 ballista/scheduler/src/cluster/event/mod.rs        |   4 +-
 ballista/scheduler/src/cluster/memory.rs           |  23 +--
 ballista/scheduler/src/cluster/mod.rs              |  16 +-
 ballista/scheduler/src/cluster/test_util/mod.rs    |   2 +-
 ballista/scheduler/src/config.rs                   |   4 +-
 ballista/scheduler/src/display.rs                  |   2 +-
 ballista/scheduler/src/metrics/prometheus.rs       |   4 +-
 .../src/physical_optimizer/join_selection.rs       |  30 ++--
 ballista/scheduler/src/planner.rs                  |   6 +-
 ballista/scheduler/src/scheduler_process.rs        |   4 +-
 .../src/scheduler_server/external_scaler.rs        |   6 +-
 ballista/scheduler/src/scheduler_server/grpc.rs    |  41 ++---
 ballista/scheduler/src/scheduler_server/mod.rs     |  25 +--
 .../src/scheduler_server/query_stage_scheduler.rs  |  13 +-
 ballista/scheduler/src/standalone.rs               |  10 +-
 .../scheduler/src/state/distributed_explain.rs     |   2 +-
 ballista/scheduler/src/state/execution_graph.rs    | 167 ++++++++++++---------
 .../scheduler/src/state/execution_graph_dot.rs     |  10 +-
 ballista/scheduler/src/state/execution_stage.rs    |  33 +++-
 ballista/scheduler/src/state/executor_manager.rs   |  13 +-
 ballista/scheduler/src/state/mod.rs                |  24 +--
 ballista/scheduler/src/state/session_manager.rs    |   4 +-
 ballista/scheduler/src/state/task_manager.rs       |  26 +++-
 ballista/scheduler/src/test_utils.rs               |  20 +--
 benchmarks/Cargo.toml                              |   2 +-
 benchmarks/src/bin/tpch.rs                         |  16 +-
 dev/msrvcheck/Cargo.toml                           |   2 +-
 dev/msrvcheck/src/main.rs                          |   2 +-
 examples/examples/custom-executor.rs               |   2 +-
 examples/examples/remote-dataframe.rs              |   2 +-
 examples/examples/standalone-sql.rs                |   2 +-
 examples/tests/common/mod.rs                       |   4 +-
 examples/tests/object_store.rs                     |  12 +-
 rustfmt.toml                                       |   2 +-
 67 files changed, 440 insertions(+), 376 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index e9dc469c..a7c4574c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,14 +18,14 @@
 [workspace]
 exclude = ["dev/msrvcheck", "python"]
 members = ["ballista-cli", "ballista/client", "ballista/core", 
"ballista/executor", "ballista/scheduler", "benchmarks", "examples"]
-resolver = "2"
+resolver = "3"
 
 [workspace.package]
 # edition to be changed to 2024 when we update 
 # Minimum Supported Rust Version (MSRV) to 1.85.0
 # which is datafusion 49
 # 
-edition = "2021"
+edition = "2024"
 # we should try to follow datafusion version 
 rust-version = "1.88.0"
 
diff --git a/ballista-cli/src/command.rs b/ballista-cli/src/command.rs
index ebd4c787..95ed32ce 100644
--- a/ballista-cli/src/command.rs
+++ b/ballista-cli/src/command.rs
@@ -28,7 +28,7 @@ use datafusion::common::Result;
 use datafusion::error::DataFusionError;
 use datafusion::prelude::SessionContext;
 
-use crate::functions::{display_all_functions, Function};
+use crate::functions::{Function, display_all_functions};
 use crate::print_format::PrintFormat;
 use crate::print_options::PrintOptions;
 
diff --git a/ballista-cli/src/exec.rs b/ballista-cli/src/exec.rs
index efbd94fa..887728f6 100644
--- a/ballista-cli/src/exec.rs
+++ b/ballista-cli/src/exec.rs
@@ -18,15 +18,15 @@
 //! Execution functions
 
 use std::fs::File;
-use std::io::prelude::*;
 use std::io::BufReader;
+use std::io::prelude::*;
 use std::sync::Arc;
 use std::time::Instant;
 
 use datafusion::common::Result;
 use datafusion::prelude::SessionContext;
-use rustyline::error::ReadlineError;
 use rustyline::Editor;
+use rustyline::error::ReadlineError;
 
 use crate::{
     command::{Command, OutputFormat},
diff --git a/ballista-cli/src/main.rs b/ballista-cli/src/main.rs
index 098d23a4..2a6c691d 100644
--- a/ballista-cli/src/main.rs
+++ b/ballista-cli/src/main.rs
@@ -20,7 +20,7 @@ use std::{env, sync::Arc};
 
 use ballista::{extension::SessionConfigExt, prelude::SessionContextExt};
 use ballista_cli::{
-    exec, print_format::PrintFormat, print_options::PrintOptions, 
BALLISTA_CLI_VERSION,
+    BALLISTA_CLI_VERSION, exec, print_format::PrintFormat, 
print_options::PrintOptions,
 };
 use clap::Parser;
 use datafusion::{
diff --git a/ballista/client/tests/bugs.rs b/ballista/client/tests/bugs.rs
index 02b21da2..0c42fefe 100644
--- a/ballista/client/tests/bugs.rs
+++ b/ballista/client/tests/bugs.rs
@@ -130,14 +130,14 @@ order by sum_sales - avg_monthly_sales, s_store_name
 
         let result = ctx.sql(query).await?.collect().await?;
         let expected = [
-    
"+-------------+-----------+----------------+----------------+--------+-------+--------------------+-----------+---------+---------+",
-    "| i_category  | i_brand   | s_store_name   | s_company_name | d_year | 
d_moy | avg_monthly_sales  | sum_sales | psum    | nsum    |",
-    
"+-------------+-----------+----------------+----------------+--------+-------+--------------------+-----------+---------+---------+",
-    "| Electronics | TechBrand | Downtown Store | Retail Corp    | 1999   | 4  
   | 1499.9850000000001 | 999.99    | 999.99  | 999.99  |",
-    "| Electronics | TechBrand | Downtown Store | Retail Corp    | 1999   | 3  
   | 1499.9850000000001 | 999.99    | 1999.98 | 999.99  |",
-    "| Electronics | TechBrand | Downtown Store | Retail Corp    | 1999   | 1  
   | 1499.9850000000001 | 1999.98   | 1999.98 | 1999.98 |",
-    "| Electronics | TechBrand | Downtown Store | Retail Corp    | 1999   | 2  
   | 1499.9850000000001 | 1999.98   | 1999.98 | 999.99  |",
-    
"+-------------+-----------+----------------+----------------+--------+-------+--------------------+-----------+---------+---------+",
+            
"+-------------+-----------+----------------+----------------+--------+-------+--------------------+-----------+---------+---------+",
+            "| i_category  | i_brand   | s_store_name   | s_company_name | 
d_year | d_moy | avg_monthly_sales  | sum_sales | psum    | nsum    |",
+            
"+-------------+-----------+----------------+----------------+--------+-------+--------------------+-----------+---------+---------+",
+            "| Electronics | TechBrand | Downtown Store | Retail Corp    | 
1999   | 4     | 1499.9850000000001 | 999.99    | 999.99  | 999.99  |",
+            "| Electronics | TechBrand | Downtown Store | Retail Corp    | 
1999   | 3     | 1499.9850000000001 | 999.99    | 1999.98 | 999.99  |",
+            "| Electronics | TechBrand | Downtown Store | Retail Corp    | 
1999   | 1     | 1499.9850000000001 | 1999.98   | 1999.98 | 1999.98 |",
+            "| Electronics | TechBrand | Downtown Store | Retail Corp    | 
1999   | 2     | 1499.9850000000001 | 1999.98   | 1999.98 | 999.99  |",
+            
"+-------------+-----------+----------------+----------------+--------+-------+--------------------+-----------+---------+---------+",
         ];
 
         assert_batches_eq!(expected, &result);
diff --git a/ballista/client/tests/common/mod.rs 
b/ballista/client/tests/common/mod.rs
index 85cb3fbc..9d940be6 100644
--- a/ballista/client/tests/common/mod.rs
+++ b/ballista/client/tests/common/mod.rs
@@ -21,7 +21,7 @@ use std::path::PathBuf;
 
 use ballista::prelude::{SessionConfigExt, SessionContextExt};
 use ballista_core::serde::{
-    protobuf::scheduler_grpc_client::SchedulerGrpcClient, BallistaCodec,
+    BallistaCodec, protobuf::scheduler_grpc_client::SchedulerGrpcClient,
 };
 use ballista_core::{ConfigProducer, RuntimeProducer};
 use ballista_scheduler::SessionBuilder;
diff --git a/ballista/client/tests/context_setup.rs 
b/ballista/client/tests/context_setup.rs
index e1a49bb0..df999c06 100644
--- a/ballista/client/tests/context_setup.rs
+++ b/ballista/client/tests/context_setup.rs
@@ -103,7 +103,7 @@ mod remote {
 #[cfg(feature = "standalone")]
 mod standalone {
 
-    use std::sync::{atomic::AtomicBool, Arc};
+    use std::sync::{Arc, atomic::AtomicBool};
 
     use ballista::extension::{SessionConfigExt, SessionContextExt};
     use ballista_core::serde::BallistaPhysicalExtensionCodec;
@@ -111,7 +111,7 @@ mod standalone {
         assert_batches_eq,
         common::exec_err,
         execution::{
-            context::QueryPlanner, SessionState, SessionStateBuilder, 
TaskContext,
+            SessionState, SessionStateBuilder, TaskContext, 
context::QueryPlanner,
         },
         logical_expr::LogicalPlan,
         physical_plan::ExecutionPlan,
@@ -226,9 +226,11 @@ mod standalone {
             .collect()
             .await;
 
-        assert!(physical_codec
-            .invoked
-            .load(std::sync::atomic::Ordering::Relaxed));
+        assert!(
+            physical_codec
+                .invoked
+                .load(std::sync::atomic::Ordering::Relaxed)
+        );
         Ok(())
     }
 
diff --git a/ballista/core/src/client.rs b/ballista/core/src/client.rs
index 1b354346..bc3659d2 100644
--- a/ballista/core/src/client.rs
+++ b/ballista/core/src/client.rs
@@ -29,9 +29,9 @@ use crate::error::{BallistaError, Result as BResult};
 use crate::serde::scheduler::{Action, PartitionId};
 
 use arrow_flight;
-use arrow_flight::utils::flight_data_to_arrow_batch;
 use arrow_flight::Ticket;
-use arrow_flight::{flight_service_client::FlightServiceClient, FlightData};
+use arrow_flight::utils::flight_data_to_arrow_batch;
+use arrow_flight::{FlightData, flight_service_client::FlightServiceClient};
 use datafusion::arrow::array::ArrayRef;
 use datafusion::arrow::buffer::{Buffer, MutableBuffer};
 use datafusion::arrow::ipc::convert::try_schema_from_ipc_buffer;
@@ -45,7 +45,7 @@ use datafusion::error::DataFusionError;
 use datafusion::error::Result;
 
 use crate::serde::protobuf;
-use crate::utils::{create_grpc_client_connection, GrpcClientConfig};
+use crate::utils::{GrpcClientConfig, create_grpc_client_connection};
 use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
 use futures::{Stream, StreamExt};
 use log::{debug, warn};
diff --git a/ballista/core/src/consistent_hash/mod.rs 
b/ballista/core/src/consistent_hash/mod.rs
index 51fa8add..73767e56 100644
--- a/ballista/core/src/consistent_hash/mod.rs
+++ b/ballista/core/src/consistent_hash/mod.rs
@@ -153,10 +153,10 @@ where
             .range(hashed_key..)
             .chain(self.virtual_nodes.iter())
         {
-            if let Some((node, _)) = self.node_replicas.get(node_name) {
-                if node.is_valid() {
-                    return Some(position_key.clone());
-                }
+            if let Some((node, _)) = self.node_replicas.get(node_name)
+                && node.is_valid()
+            {
+                return Some(position_key.clone());
             }
             if tolerance == 0 {
                 return None;
@@ -177,8 +177,8 @@ pub fn md5_hash(data: &[u8]) -> Vec<u8> {
 
 #[cfg(test)]
 mod test {
-    use crate::consistent_hash::node::Node;
     use crate::consistent_hash::ConsistentHash;
+    use crate::consistent_hash::node::Node;
 
     #[test]
     fn test_topology() {
@@ -219,9 +219,11 @@ mod test {
         for (i, key) in keys.iter().enumerate() {
             if i == 2 {
                 assert!(consistent_hash.get(key.as_bytes()).is_none());
-                assert!(consistent_hash
-                    .get_with_tolerance(key.as_bytes(), 1)
-                    .is_some());
+                assert!(
+                    consistent_hash
+                        .get_with_tolerance(key.as_bytes(), 1)
+                        .is_some()
+                );
             } else {
                 assert_eq!(
                     consistent_hash.get(key.as_bytes()).unwrap().name(),
diff --git a/ballista/core/src/diagram.rs b/ballista/core/src/diagram.rs
index b316d7c9..1af9e08f 100644
--- a/ballista/core/src/diagram.rs
+++ b/ballista/core/src/diagram.rs
@@ -19,6 +19,7 @@ use crate::error::Result;
 use crate::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
 
 use datafusion::datasource::source::DataSourceExec;
+use datafusion::physical_plan::ExecutionPlan;
 use datafusion::physical_plan::aggregates::AggregateExec;
 use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -26,12 +27,11 @@ use datafusion::physical_plan::filter::FilterExec;
 use datafusion::physical_plan::joins::HashJoinExec;
 use datafusion::physical_plan::projection::ProjectionExec;
 use datafusion::physical_plan::sorts::sort::SortExec;
-use datafusion::physical_plan::ExecutionPlan;
 use log::warn;
 use std::fs::File;
 use std::io::{BufWriter, Write};
-use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
+use std::sync::atomic::{AtomicUsize, Ordering};
 
 pub fn produce_diagram(filename: &str, stages: &[Arc<ShuffleWriterExec>]) -> 
Result<()> {
     let write_file = File::create(filename)?;
diff --git a/ballista/core/src/event_loop.rs b/ballista/core/src/event_loop.rs
index c82cc231..be970123 100644
--- a/ballista/core/src/event_loop.rs
+++ b/ballista/core/src/event_loop.rs
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
 
 use async_trait::async_trait;
 use log::{error, info};
diff --git a/ballista/core/src/execution_plans/distributed_query.rs 
b/ballista/core/src/execution_plans/distributed_query.rs
index 4336b1fc..8a084f96 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -19,11 +19,11 @@ use crate::client::BallistaClient;
 use crate::config::BallistaConfig;
 use crate::serde::protobuf::SuccessfulJob;
 use crate::serde::protobuf::{
-    execute_query_params::Query, execute_query_result, job_status,
-    scheduler_grpc_client::SchedulerGrpcClient, ExecuteQueryParams, 
GetJobStatusParams,
-    GetJobStatusResult, KeyValuePair, PartitionLocation,
+    ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair,
+    PartitionLocation, execute_query_params::Query, execute_query_result, 
job_status,
+    scheduler_grpc_client::SchedulerGrpcClient,
 };
-use crate::utils::{create_grpc_client_connection, GrpcClientConfig};
+use crate::utils::{GrpcClientConfig, create_grpc_client_connection};
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::arrow::error::ArrowError;
 use datafusion::arrow::record_batch::RecordBatch;
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs 
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 0635d310..2ebc2fd3 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -52,7 +52,7 @@ use itertools::Itertools;
 use log::{debug, error, trace};
 use rand::prelude::SliceRandom;
 use rand::rng;
-use tokio::sync::{mpsc, Semaphore};
+use tokio::sync::{Semaphore, mpsc};
 use tokio_stream::wrappers::ReceiverStream;
 
 /// ShuffleReaderExec reads partitions that have already been materialized by 
a ShuffleWriterExec
@@ -165,9 +165,9 @@ impl ExecutionPlan for ShuffleReaderExec {
 
         if force_remote_read {
             debug!(
-            "All shuffle partitions will be read as remote partitions! To 
disable this behavior set: `{}=false`",
-            crate::config::BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ
-        );
+                "All shuffle partitions will be read as remote partitions! To 
disable this behavior set: `{}=false`",
+                crate::config::BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ
+            );
         }
 
         log::debug!(
@@ -223,9 +223,7 @@ impl ExecutionPlan for ShuffleReaderExec {
 
             trace!(
                 "shuffle reader at stage: {} and partition {} returned 
statistics: {:?}",
-                self.stage_id,
-                idx,
-                stat_for_partition
+                self.stage_id, idx, stat_for_partition
             );
             stat_for_partition
         } else {
@@ -236,7 +234,10 @@ impl ExecutionPlan for ShuffleReaderExec {
                     .flatten()
                     .map(|loc| loc.partition_stats),
             );
-            trace!("shuffle reader at stage: {} returned statistics for all 
partitions: {:?}", self.stage_id, stats_for_partitions);
+            trace!(
+                "shuffle reader at stage: {} returned statistics for all 
partitions: {:?}",
+                self.stage_id, stats_for_partitions
+            );
             Ok(stats_for_partitions)
         }
     }
@@ -575,7 +576,7 @@ mod tests {
     use datafusion::physical_plan::common;
 
     use datafusion::prelude::SessionContext;
-    use tempfile::{tempdir, TempDir};
+    use tempfile::{TempDir, tempdir};
 
     #[tokio::test]
     async fn test_stats_for_partitions_empty() {
@@ -740,8 +741,8 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn 
test_stats_for_partition_statistics_specific_partition_out_of_range(
-    ) -> Result<()> {
+    async fn 
test_stats_for_partition_statistics_specific_partition_out_of_range()
+    -> Result<()> {
         let schema = Schema::new(vec![
             Field::new("a", DataType::Int32, false),
             Field::new("b", DataType::Int32, false),
diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs 
b/ballista/core/src/execution_plans/shuffle_writer.rs
index c1be52e3..42cfe03d 100644
--- a/ballista/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/core/src/execution_plans/shuffle_writer.rs
@@ -20,8 +20,8 @@
 //! partition is re-partitioned and streamed to disk in Arrow IPC format. 
Future stages of the query
 //! will use the ShuffleReaderExec to read these results.
 
-use datafusion::arrow::ipc::writer::IpcWriteOptions;
 use datafusion::arrow::ipc::CompressionType;
+use datafusion::arrow::ipc::writer::IpcWriteOptions;
 
 use datafusion::arrow::ipc::writer::StreamWriter;
 use std::any::Any;
@@ -51,8 +51,8 @@ use datafusion::physical_plan::metrics::{
 };
 
 use datafusion::physical_plan::{
-    displayable, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
-    PlanProperties, SendableRecordBatchStream, Statistics,
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+    SendableRecordBatchStream, Statistics, displayable,
 };
 use futures::{StreamExt, TryFutureExt, TryStreamExt};
 
diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs
index 49bff2de..f2690174 100644
--- a/ballista/core/src/extension.rs
+++ b/ballista/core/src/extension.rs
@@ -16,9 +16,10 @@
 // under the License.
 
 use crate::config::{
-    BallistaConfig, BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, BALLISTA_JOB_NAME,
+    BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, BALLISTA_JOB_NAME,
     BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ, 
BALLISTA_SHUFFLE_READER_MAX_REQUESTS,
     BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT, 
BALLISTA_STANDALONE_PARALLELISM,
+    BallistaConfig,
 };
 use crate::planner::BallistaQueryPlanner;
 use crate::serde::protobuf::KeyValuePair;
@@ -563,8 +564,10 @@ mod test {
         let pairs = config.to_key_value_pairs();
 
         assert!(pairs.iter().any(|p| p.key == BALLISTA_JOB_NAME));
-        assert!(pairs
-            .iter()
-            .any(|p| p.key == "datafusion.catalog.information_schema"))
+        assert!(
+            pairs
+                .iter()
+                .any(|p| p.key == "datafusion.catalog.information_schema")
+        )
     }
 }
diff --git a/ballista/core/src/object_store.rs 
b/ballista/core/src/object_store.rs
index b698ebdc..3f752f9a 100644
--- a/ballista/core/src/object_store.rs
+++ b/ballista/core/src/object_store.rs
@@ -196,10 +196,13 @@ impl CustomObjectStoreRegistry {
         }
 
         if let Some(endpoint) = endpoint {
-            if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) {
-                if !matches!(allow_http, Some(true)) && endpoint_url.scheme() 
== "http" {
-                    return config_err!("Invalid endpoint: {endpoint}. HTTP is 
not allowed for S3 endpoints. To allow HTTP, set 's3.allow_http' to true");
-                }
+            if let Ok(endpoint_url) = Url::try_from(endpoint.as_str())
+                && !matches!(allow_http, Some(true))
+                && endpoint_url.scheme() == "http"
+            {
+                return config_err!(
+                    "Invalid endpoint: {endpoint}. HTTP is not allowed for S3 
endpoints. To allow HTTP, set 's3.allow_http' to true"
+                );
             }
 
             builder = builder.with_endpoint(endpoint);
diff --git a/ballista/core/src/planner.rs b/ballista/core/src/planner.rs
index 266da3c6..bf9f5b7b 100644
--- a/ballista/core/src/planner.rs
+++ b/ballista/core/src/planner.rs
@@ -25,8 +25,8 @@ use datafusion::common::tree_node::{TreeNode, 
TreeNodeVisitor};
 use datafusion::error::DataFusionError;
 use datafusion::execution::context::{QueryPlanner, SessionState};
 use datafusion::logical_expr::{LogicalPlan, TableScan};
-use datafusion::physical_plan::empty::EmptyExec;
 use datafusion::physical_plan::ExecutionPlan;
+use datafusion::physical_plan::empty::EmptyExec;
 use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
 use datafusion_proto::logical_plan::{AsLogicalPlan, LogicalExtensionCodec};
 use std::marker::PhantomData;
@@ -180,7 +180,7 @@ mod test {
     use datafusion::{
         common::tree_node::TreeNode,
         error::Result,
-        execution::{runtime_env::RuntimeEnvBuilder, SessionStateBuilder},
+        execution::{SessionStateBuilder, runtime_env::RuntimeEnvBuilder},
         prelude::{SessionConfig, SessionContext},
     };
 
diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs
index 08901682..c71c0b70 100644
--- a/ballista/core/src/serde/mod.rs
+++ b/ballista/core/src/serde/mod.rs
@@ -29,10 +29,10 @@ use datafusion_proto::logical_plan::file_formats::{
     ArrowLogicalExtensionCodec, AvroLogicalExtensionCodec, 
CsvLogicalExtensionCodec,
     JsonLogicalExtensionCodec, ParquetLogicalExtensionCodec,
 };
+use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
 use 
datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning;
 use datafusion_proto::physical_plan::from_proto::parse_protobuf_partitioning;
 use datafusion_proto::physical_plan::to_proto::serialize_partitioning;
-use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
 use datafusion_proto::protobuf::proto_error;
 use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
 use datafusion_proto::{
@@ -491,12 +491,12 @@ struct FileFormatProto {
 mod test {
     use super::*;
     use datafusion::arrow::datatypes::{DataType, Field, Schema};
-    use datafusion::physical_plan::expressions::col;
     use datafusion::physical_plan::Partitioning;
+    use datafusion::physical_plan::expressions::col;
     use datafusion::{
         common::DFSchema,
-        datasource::file_format::{parquet::ParquetFormatFactory, 
DefaultFileType},
-        logical_expr::{dml::CopyTo, EmptyRelation, LogicalPlan},
+        datasource::file_format::{DefaultFileType, 
parquet::ParquetFormatFactory},
+        logical_expr::{EmptyRelation, LogicalPlan, dml::CopyTo},
         prelude::SessionContext,
     };
     use datafusion_proto::{logical_plan::AsLogicalPlan, 
protobuf::LogicalPlanNode};
diff --git a/ballista/core/src/serde/scheduler/from_proto.rs 
b/ballista/core/src/serde/scheduler/from_proto.rs
index dc8c53c7..76c0e7d9 100644
--- a/ballista/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/core/src/serde/scheduler/from_proto.rs
@@ -41,9 +41,9 @@ use crate::serde::scheduler::{
     TaskDefinition,
 };
 
-use crate::serde::{protobuf, BallistaCodec};
 use crate::RuntimeProducer;
-use protobuf::{operator_metric, NamedCount, NamedGauge, NamedTime};
+use crate::serde::{BallistaCodec, protobuf};
+use protobuf::{NamedCount, NamedGauge, NamedTime, operator_metric};
 
 impl TryInto<Action> for protobuf::Action {
     type Error = BallistaError;
@@ -90,11 +90,7 @@ impl Into<PartitionStats> for protobuf::PartitionStats {
 }
 
 fn foo(n: i64) -> Option<u64> {
-    if n < 0 {
-        None
-    } else {
-        Some(n as u64)
-    }
+    if n < 0 { None } else { Some(n as u64) }
 }
 
 impl TryInto<PartitionLocation> for protobuf::PartitionLocation {
@@ -291,21 +287,17 @@ impl Into<ExecutorData> for protobuf::ExecutorData {
             available_task_slots: 0,
         };
         for resource in self.resources {
-            if let Some(task_slots) = resource.total {
-                if let Some(protobuf::executor_resource::Resource::TaskSlots(
-                    task_slots,
-                )) = task_slots.resource
-                {
-                    ret.total_task_slots = task_slots
-                }
+            if let Some(task_slots) = resource.total
+                && let 
Some(protobuf::executor_resource::Resource::TaskSlots(task_slots)) =
+                    task_slots.resource
+            {
+                ret.total_task_slots = task_slots
             };
-            if let Some(task_slots) = resource.available {
-                if let Some(protobuf::executor_resource::Resource::TaskSlots(
-                    task_slots,
-                )) = task_slots.resource
-                {
-                    ret.available_task_slots = task_slots
-                }
+            if let Some(task_slots) = resource.available
+                && let 
Some(protobuf::executor_resource::Resource::TaskSlots(task_slots)) =
+                    task_slots.resource
+            {
+                ret.available_task_slots = task_slots
             };
         }
         ret
diff --git a/ballista/core/src/serde/scheduler/to_proto.rs 
b/ballista/core/src/serde/scheduler/to_proto.rs
index 5abb0992..01c9d20e 100644
--- a/ballista/core/src/serde/scheduler/to_proto.rs
+++ b/ballista/core/src/serde/scheduler/to_proto.rs
@@ -29,7 +29,7 @@ use crate::serde::scheduler::{
     PartitionLocation, PartitionStats,
 };
 use datafusion::physical_plan::Partitioning;
-use protobuf::{action::ActionType, operator_metric, NamedCount, NamedGauge, 
NamedTime};
+use protobuf::{NamedCount, NamedGauge, NamedTime, action::ActionType, 
operator_metric};
 
 impl TryInto<protobuf::Action> for Action {
     type Error = BallistaError;
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 248dbbe0..f77e1433 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -20,15 +20,15 @@ use crate::error::{BallistaError, Result};
 use crate::extension::SessionConfigExt;
 use crate::serde::scheduler::PartitionStats;
 
+use datafusion::arrow::ipc::CompressionType;
 use datafusion::arrow::ipc::writer::IpcWriteOptions;
 use datafusion::arrow::ipc::writer::StreamWriter;
-use datafusion::arrow::ipc::CompressionType;
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::execution::context::{SessionConfig, SessionState};
 use datafusion::execution::runtime_env::RuntimeEnvBuilder;
 use datafusion::execution::session_state::SessionStateBuilder;
 use datafusion::physical_plan::metrics::MetricsSet;
-use datafusion::physical_plan::{metrics, ExecutionPlan, RecordBatchStream};
+use datafusion::physical_plan::{ExecutionPlan, RecordBatchStream, metrics};
 use futures::StreamExt;
 use log::error;
 use std::sync::Arc;
diff --git a/ballista/executor/src/bin/main.rs 
b/ballista/executor/src/bin/main.rs
index 96d3e1fd..cbeb93b8 100644
--- a/ballista/executor/src/bin/main.rs
+++ b/ballista/executor/src/bin/main.rs
@@ -23,7 +23,7 @@ use ballista_core::object_store::{
 };
 use ballista_executor::config::Config;
 use ballista_executor::executor_process::{
-    start_executor_process, ExecutorProcessConfig,
+    ExecutorProcessConfig, start_executor_process,
 };
 use clap::Parser;
 use std::env;
diff --git a/ballista/executor/src/collect.rs b/ballista/executor/src/collect.rs
index 84beff6e..2e7649de 100644
--- a/ballista/executor/src/collect.rs
+++ b/ballista/executor/src/collect.rs
@@ -30,8 +30,8 @@ use datafusion::physical_plan::{
     SendableRecordBatchStream, Statistics,
 };
 use datafusion::{error::Result, physical_plan::RecordBatchStream};
-use futures::stream::SelectAll;
 use futures::Stream;
+use futures::stream::SelectAll;
 
 /// The CollectExec operator retrieves results from the cluster and returns 
them as a single
 /// vector of [RecordBatch].
diff --git a/ballista/executor/src/execution_engine.rs 
b/ballista/executor/src/execution_engine.rs
index c7f9c8a5..eabb515a 100644
--- a/ballista/executor/src/execution_engine.rs
+++ b/ballista/executor/src/execution_engine.rs
@@ -21,8 +21,8 @@ use ballista_core::serde::protobuf::ShuffleWritePartition;
 use ballista_core::utils;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::TaskContext;
-use datafusion::physical_plan::metrics::MetricsSet;
 use datafusion::physical_plan::ExecutionPlan;
+use datafusion::physical_plan::metrics::MetricsSet;
 use std::fmt::{Debug, Display};
 use std::sync::Arc;
 
diff --git a/ballista/executor/src/execution_loop.rs 
b/ballista/executor/src/execution_loop.rs
index 36b58a13..d97b9926 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -18,15 +18,15 @@
 use crate::cpu_bound_executor::DedicatedExecutor;
 use crate::executor::Executor;
 use crate::executor_process::remove_job_dir;
-use crate::{as_task_status, TaskExecutionTimes};
+use crate::{TaskExecutionTimes, as_task_status};
 use ballista_core::error::BallistaError;
 use ballista_core::extension::SessionConfigHelperExt;
+use ballista_core::serde::BallistaCodec;
 use ballista_core::serde::protobuf::{
-    scheduler_grpc_client::SchedulerGrpcClient, PollWorkParams, PollWorkResult,
-    TaskDefinition, TaskStatus,
+    PollWorkParams, PollWorkResult, TaskDefinition, TaskStatus,
+    scheduler_grpc_client::SchedulerGrpcClient,
 };
 use ballista_core::serde::scheduler::{ExecutorSpecification, PartitionId};
-use ballista_core::serde::BallistaCodec;
 use datafusion::execution::context::TaskContext;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion_proto::logical_plan::AsLogicalPlan;
@@ -176,7 +176,9 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionPlan>
                 }
             }
             Err(error) => {
-                warn!("Executor poll work loop failed. If this continues to 
happen the Scheduler might be marked as dead. Error: {error}");
+                warn!(
+                    "Executor poll work loop failed. If this continues to 
happen the Scheduler might be marked as dead. Error: {error}"
+                );
             }
         }
 
diff --git a/ballista/executor/src/executor.rs 
b/ballista/executor/src/executor.rs
index 3723c42e..55b9bc8c 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -22,13 +22,13 @@ use crate::execution_engine::ExecutionEngine;
 use crate::execution_engine::QueryStageExecutor;
 use crate::metrics::ExecutorMetricsCollector;
 use crate::metrics::LoggingMetricsCollector;
+use ballista_core::ConfigProducer;
+use ballista_core::RuntimeProducer;
 use ballista_core::error::BallistaError;
 use ballista_core::registry::BallistaFunctionRegistry;
 use ballista_core::serde::protobuf;
 use ballista_core::serde::protobuf::ExecutorRegistration;
 use ballista_core::serde::scheduler::PartitionId;
-use ballista_core::ConfigProducer;
-use ballista_core::RuntimeProducer;
 use dashmap::DashMap;
 use datafusion::execution::context::TaskContext;
 use datafusion::execution::runtime_env::RuntimeEnv;
@@ -215,11 +215,11 @@ impl Executor {
 mod test {
     use crate::execution_engine::DefaultQueryStageExec;
     use crate::executor::Executor;
+    use ballista_core::RuntimeProducer;
     use ballista_core::execution_plans::ShuffleWriterExec;
     use ballista_core::serde::protobuf::ExecutorRegistration;
     use ballista_core::serde::scheduler::PartitionId;
     use ballista_core::utils::default_config_producer;
-    use ballista_core::RuntimeProducer;
     use datafusion::arrow::datatypes::{Schema, SchemaRef};
     use datafusion::arrow::record_batch::RecordBatch;
     use datafusion::error::{DataFusionError, Result};
diff --git a/ballista/executor/src/executor_process.rs 
b/ballista/executor/src/executor_process.rs
index 4756d89b..4daa53f3 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -19,16 +19,16 @@
 
 use std::net::SocketAddr;
 use std::path::{Path, PathBuf};
-use std::sync::atomic::Ordering;
 use std::sync::Arc;
+use std::sync::atomic::Ordering;
 use std::time::{Duration, Instant, UNIX_EPOCH};
 
 use arrow_flight::flight_service_server::FlightServiceServer;
 use ballista_core::registry::BallistaFunctionRegistry;
 use datafusion_proto::logical_plan::LogicalExtensionCodec;
 use datafusion_proto::physical_plan::PhysicalExtensionCodec;
-use futures::stream::FuturesUnordered;
 use futures::StreamExt;
+use futures::stream::FuturesUnordered;
 use log::{error, info, warn};
 use tempfile::TempDir;
 use tokio::fs::DirEntry;
@@ -46,17 +46,17 @@ use ballista_core::extension::SessionConfigExt;
 use ballista_core::serde::protobuf::executor_resource::Resource;
 use ballista_core::serde::protobuf::executor_status::Status;
 use ballista_core::serde::protobuf::{
-    scheduler_grpc_client::SchedulerGrpcClient, ExecutorRegistration, 
ExecutorResource,
-    ExecutorSpecification, ExecutorStatus, ExecutorStoppedParams, 
HeartBeatParams,
+    ExecutorRegistration, ExecutorResource, ExecutorSpecification, 
ExecutorStatus,
+    ExecutorStoppedParams, HeartBeatParams, 
scheduler_grpc_client::SchedulerGrpcClient,
 };
 use ballista_core::serde::{
     BallistaCodec, BallistaLogicalExtensionCodec, 
BallistaPhysicalExtensionCodec,
 };
 use ballista_core::utils::{
-    create_grpc_client_connection, create_grpc_server, default_config_producer,
-    get_time_before, GrpcServerConfig,
+    GrpcServerConfig, create_grpc_client_connection, create_grpc_server,
+    default_config_producer, get_time_before,
 };
-use ballista_core::{ConfigProducer, RuntimeProducer, BALLISTA_VERSION};
+use ballista_core::{BALLISTA_VERSION, ConfigProducer, RuntimeProducer};
 
 use crate::execution_engine::ExecutionEngine;
 use crate::executor::{Executor, TasksDrainedFuture};
@@ -65,8 +65,8 @@ use crate::flight_service::BallistaFlightService;
 use crate::metrics::LoggingMetricsCollector;
 use crate::shutdown::Shutdown;
 use crate::shutdown::ShutdownNotifier;
+use crate::{ArrowFlightServerProvider, terminate};
 use crate::{execution_loop, executor_server};
-use crate::{terminate, ArrowFlightServerProvider};
 
 pub struct ExecutorProcessConfig {
     pub bind_host: String,
@@ -493,7 +493,9 @@ async fn flight_server_task(
     grpc_server_config: GrpcServerConfig,
 ) -> JoinHandle<Result<(), BallistaError>> {
     tokio::spawn(async move {
-        info!("Built-in arrow flight server listening on: {address:?} 
max_encoding_size: {max_encoding_message_size} max_decoding_size: 
{max_decoding_message_size}");
+        info!(
+            "Built-in arrow flight server listening on: {address:?} 
max_encoding_size: {max_encoding_message_size} max_decoding_size: 
{max_decoding_message_size}"
+        );
 
         let server_future = create_grpc_server(&grpc_server_config)
             .add_service(
@@ -556,7 +558,9 @@ async fn clean_shuffle_data_loop(
                     Ok(_) => {}
                 }
             } else {
-                warn!("{child_path:?} under the working directory is a not a 
directory and will be ignored when doing cleanup")
+                warn!(
+                    "{child_path:?} under the working directory is a not a 
directory and will be ignored when doing cleanup"
+                )
             }
         } else {
             error!("Fail to get metadata for file {:?}", child.path())
diff --git a/ballista/executor/src/executor_server.rs 
b/ballista/executor/src/executor_server.rs
index 1b40650e..ed2b2504 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -18,8 +18,8 @@
 use ballista_core::BALLISTA_VERSION;
 use std::collections::HashMap;
 use std::convert::TryInto;
-use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
 use tokio::sync::mpsc;
 
@@ -29,21 +29,21 @@ use tonic::{Request, Response, Status};
 
 use ballista_core::error::BallistaError;
 use ballista_core::extension::SessionConfigExt;
+use ballista_core::serde::BallistaCodec;
 use ballista_core::serde::protobuf::{
-    executor_grpc_server::{ExecutorGrpc, ExecutorGrpcServer},
-    executor_metric, executor_status,
-    scheduler_grpc_client::SchedulerGrpcClient,
     CancelTasksParams, CancelTasksResult, ExecutorMetric, ExecutorStatus,
     HeartBeatParams, LaunchMultiTaskParams, LaunchMultiTaskResult, 
LaunchTaskParams,
     LaunchTaskResult, RegisterExecutorParams, RemoveJobDataParams, 
RemoveJobDataResult,
     StopExecutorParams, StopExecutorResult, TaskStatus, UpdateTaskStatusParams,
+    executor_grpc_server::{ExecutorGrpc, ExecutorGrpcServer},
+    executor_metric, executor_status,
+    scheduler_grpc_client::SchedulerGrpcClient,
 };
+use ballista_core::serde::scheduler::PartitionId;
+use ballista_core::serde::scheduler::TaskDefinition;
 use ballista_core::serde::scheduler::from_proto::{
     get_task_definition, get_task_definition_vec,
 };
-use ballista_core::serde::scheduler::PartitionId;
-use ballista_core::serde::scheduler::TaskDefinition;
-use ballista_core::serde::BallistaCodec;
 use ballista_core::utils::{create_grpc_client_connection, create_grpc_server};
 use dashmap::DashMap;
 use datafusion::execution::TaskContext;
@@ -53,9 +53,9 @@ use tokio::task::JoinHandle;
 
 use crate::cpu_bound_executor::DedicatedExecutor;
 use crate::executor::Executor;
-use crate::executor_process::{remove_job_dir, ExecutorProcessConfig};
+use crate::executor_process::{ExecutorProcessConfig, remove_job_dir};
 use crate::shutdown::ShutdownNotifier;
-use crate::{as_task_status, TaskExecutionTimes};
+use crate::{TaskExecutionTimes, as_task_status};
 
 type ServerHandle = JoinHandle<Result<(), BallistaError>>;
 type SchedulerClients = Arc<DashMap<String, SchedulerGrpcClient<Channel>>>;
@@ -522,7 +522,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskRunnerPool<T,
                             break;
                         }
                         Err(TryRecvError::Disconnected) => {
-                            info!("Channel is closed and will exit the task 
status report loop");
+                            info!(
+                                "Channel is closed and will exit the task 
status report loop"
+                            );
                             drop(tasks_status_complete);
                             return;
                         }
diff --git a/ballista/executor/src/flight_service.rs 
b/ballista/executor/src/flight_service.rs
index 0de791f6..9ff762f0 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -31,9 +31,9 @@ use ballista_core::serde::scheduler::Action as BallistaAction;
 use datafusion::arrow::ipc::CompressionType;
 
 use arrow_flight::{
-    flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
-    FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, 
HandshakeResponse,
-    PollInfo, PutResult, SchemaResult, Ticket,
+    Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, 
FlightInfo,
+    HandshakeRequest, HandshakeResponse, PollInfo, PutResult, SchemaResult, 
Ticket,
+    flight_service_server::FlightService,
 };
 use datafusion::arrow::ipc::writer::IpcWriteOptions;
 use datafusion::arrow::{error::ArrowError, record_batch::RecordBatch};
diff --git a/ballista/executor/src/lib.rs b/ballista/executor/src/lib.rs
index e9322592..15f42a70 100644
--- a/ballista/executor/src/lib.rs
+++ b/ballista/executor/src/lib.rs
@@ -44,8 +44,8 @@ use log::info;
 
 use crate::shutdown::Shutdown;
 use ballista_core::serde::protobuf::{
-    task_status, FailedTask, OperatorMetricsSet, ShuffleWritePartition, 
SuccessfulTask,
-    TaskStatus,
+    FailedTask, OperatorMetricsSet, ShuffleWritePartition, SuccessfulTask, 
TaskStatus,
+    task_status,
 };
 use ballista_core::serde::scheduler::PartitionId;
 use ballista_core::utils::GrpcServerConfig;
diff --git a/ballista/executor/src/standalone.rs 
b/ballista/executor/src/standalone.rs
index d9fa3c51..aa3b12b5 100644
--- a/ballista/executor/src/standalone.rs
+++ b/ballista/executor/src/standalone.rs
@@ -20,14 +20,14 @@ use crate::{execution_loop, executor::Executor, 
flight_service::BallistaFlightSe
 use arrow_flight::flight_service_server::FlightServiceServer;
 use ballista_core::extension::SessionConfigExt;
 use ballista_core::registry::BallistaFunctionRegistry;
-use ballista_core::utils::{default_config_producer, GrpcServerConfig};
+use ballista_core::utils::{GrpcServerConfig, default_config_producer};
 use ballista_core::{
+    BALLISTA_VERSION,
     error::Result,
-    serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient, 
ExecutorRegistration},
-    serde::scheduler::ExecutorSpecification,
     serde::BallistaCodec,
+    serde::protobuf::{ExecutorRegistration, 
scheduler_grpc_client::SchedulerGrpcClient},
+    serde::scheduler::ExecutorSpecification,
     utils::create_grpc_server,
-    BALLISTA_VERSION,
 };
 use ballista_core::{ConfigProducer, RuntimeProducer};
 use datafusion::execution::{SessionState, SessionStateBuilder};
diff --git a/ballista/scheduler/src/api/handlers.rs 
b/ballista/scheduler/src/api/handlers.rs
index 8b5ed367..3230a8be 100644
--- a/ballista/scheduler/src/api/handlers.rs
+++ b/ballista/scheduler/src/api/handlers.rs
@@ -10,17 +10,17 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use crate::scheduler_server::SchedulerServer;
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use crate::state::execution_graph::ExecutionStage;
 use crate::state::execution_graph_dot::ExecutionGraphDot;
 use axum::{
+    Json,
     extract::{Path, State},
     response::{IntoResponse, Response},
-    Json,
 };
-use ballista_core::serde::protobuf::job_status::Status;
 use ballista_core::BALLISTA_VERSION;
+use ballista_core::serde::protobuf::job_status::Status;
 use datafusion::physical_plan::metrics::{MetricValue, MetricsSet, Time};
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use datafusion_proto::physical_plan::AsExecutionPlan;
@@ -30,7 +30,7 @@ use graphviz_rust::{
     exec,
     printer::PrinterContext,
 };
-use http::{header::CONTENT_TYPE, StatusCode};
+use http::{StatusCode, header::CONTENT_TYPE};
 use std::sync::Arc;
 use std::time::Duration;
 
diff --git a/ballista/scheduler/src/api/mod.rs 
b/ballista/scheduler/src/api/mod.rs
index 05cf2718..5dd1aee2 100644
--- a/ballista/scheduler/src/api/mod.rs
+++ b/ballista/scheduler/src/api/mod.rs
@@ -14,7 +14,7 @@ mod handlers;
 
 use crate::scheduler_server::SchedulerServer;
 use axum::routing::patch;
-use axum::{routing::get, Router};
+use axum::{Router, routing::get};
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use datafusion_proto::physical_plan::AsExecutionPlan;
 use std::sync::Arc;
diff --git a/ballista/scheduler/src/cluster/event/mod.rs 
b/ballista/scheduler/src/cluster/event/mod.rs
index 4c294491..659cc424 100644
--- a/ballista/scheduler/src/cluster/event/mod.rs
+++ b/ballista/scheduler/src/cluster/event/mod.rs
@@ -20,8 +20,8 @@ use log::debug;
 use parking_lot::RwLock;
 use std::collections::BTreeMap;
 use std::pin::Pin;
-use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::task::{Context, Poll, Waker};
 use tokio::sync::broadcast;
 use tokio::sync::broadcast::error::TryRecvError;
@@ -157,8 +157,8 @@ impl<T: Clone> Stream for EventSubscriber<T> {
 #[cfg(test)]
 mod test {
     use crate::cluster::event::{ClusterEventSender, EventSubscriber};
-    use futures::stream::FuturesUnordered;
     use futures::StreamExt;
+    use futures::stream::FuturesUnordered;
 
     async fn collect_events<T: Clone>(mut rx: EventSubscriber<T>) -> Vec<T> {
         let mut events = vec![];
diff --git a/ballista/scheduler/src/cluster/memory.rs 
b/ballista/scheduler/src/cluster/memory.rs
index 4ea39dbb..c22ee31c 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -16,24 +16,25 @@
 // under the License.
 
 use crate::cluster::{
-    bind_task_bias, bind_task_consistent_hash, bind_task_round_robin, 
get_scan_files,
-    is_skip_consistent_hash, BoundTask, ClusterState, ExecutorSlot, JobState,
-    JobStateEvent, JobStateEventStream, JobStatus, TaskDistributionPolicy, 
TopologyNode,
+    BoundTask, ClusterState, ExecutorSlot, JobState, JobStateEvent, 
JobStateEventStream,
+    JobStatus, TaskDistributionPolicy, TopologyNode, bind_task_bias,
+    bind_task_consistent_hash, bind_task_round_robin, get_scan_files,
+    is_skip_consistent_hash,
 };
 use crate::state::execution_graph::ExecutionGraph;
 use async_trait::async_trait;
+use ballista_core::ConfigProducer;
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::serde::protobuf::{
-    executor_status, AvailableTaskSlots, ExecutorHeartbeat, ExecutorStatus, 
FailedJob,
-    QueuedJob,
+    AvailableTaskSlots, ExecutorHeartbeat, ExecutorStatus, FailedJob, 
QueuedJob,
+    executor_status,
 };
 use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
-use ballista_core::ConfigProducer;
 use dashmap::DashMap;
 use datafusion::prelude::{SessionConfig, SessionContext};
 
 use crate::cluster::event::ClusterEventSender;
-use crate::scheduler_server::{timestamp_millis, timestamp_secs, 
SessionBuilder};
+use crate::scheduler_server::{SessionBuilder, timestamp_millis, 
timestamp_secs};
 use crate::state::session_manager::create_datafusion_context;
 use crate::state::task_manager::JobInfoCache;
 use ballista_core::serde::protobuf::job_status::Status;
@@ -69,10 +70,10 @@ impl InMemoryClusterState {
     ) -> HashMap<String, TopologyNode> {
         let mut nodes: HashMap<String, TopologyNode> = HashMap::new();
         for (executor_id, slots) in guard.iter() {
-            if let Some(executors) = executors.as_ref() {
-                if !executors.contains(executor_id) {
-                    continue;
-                }
+            if let Some(executors) = executors.as_ref()
+                && !executors.contains(executor_id)
+            {
+                continue;
             }
             if let Some(executor) = self.executors.get(&slots.executor_id) {
                 let node = TopologyNode::new(
diff --git a/ballista/scheduler/src/cluster/mod.rs 
b/ballista/scheduler/src/cluster/mod.rs
index 4942d452..d9ea9e54 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -33,17 +33,17 @@ use log::debug;
 use ballista_core::consistent_hash::ConsistentHash;
 use ballista_core::error::Result;
 use ballista_core::serde::protobuf::{
-    job_status, AvailableTaskSlots, ExecutorHeartbeat, JobStatus,
+    AvailableTaskSlots, ExecutorHeartbeat, JobStatus, job_status,
 };
 use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata, 
PartitionId};
 use ballista_core::utils::{default_config_producer, default_session_builder};
-use ballista_core::{consistent_hash, ConfigProducer};
+use ballista_core::{ConfigProducer, consistent_hash};
 
 use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState};
 
 use crate::config::{SchedulerConfig, TaskDistributionPolicy};
 use crate::scheduler_server::SessionBuilder;
-use crate::state::execution_graph::{create_task_info, ExecutionGraph, 
TaskDescription};
+use crate::state::execution_graph::{ExecutionGraph, TaskDescription, 
create_task_info};
 use crate::state::task_manager::JobInfoCache;
 
 pub mod event;
@@ -532,7 +532,9 @@ pub(crate) async fn bind_task_consistent_hash(
         total_slots += node.available_slots as usize;
     }
     if total_slots == 0 {
-        debug!("Not enough available executor slots for binding tasks with 
consistent hashing policy!!!");
+        debug!(
+            "Not enough available executor slots for binding tasks with 
consistent hashing policy!!!"
+        );
         return Ok((vec![], None));
     }
     debug!("Total slot number for consistent hash binding is {total_slots}");
@@ -704,16 +706,16 @@ mod test {
     use std::sync::Arc;
 
     use datafusion::datasource::listing::PartitionedFile;
-    use object_store::path::Path;
     use object_store::ObjectMeta;
+    use object_store::path::Path;
 
     use ballista_core::error::Result;
     use ballista_core::serde::protobuf::AvailableTaskSlots;
     use ballista_core::serde::scheduler::{ExecutorMetadata, 
ExecutorSpecification};
 
     use crate::cluster::{
-        bind_task_bias, bind_task_consistent_hash, bind_task_round_robin, 
BoundTask,
-        TopologyNode,
+        BoundTask, TopologyNode, bind_task_bias, bind_task_consistent_hash,
+        bind_task_round_robin,
     };
     use crate::state::execution_graph::ExecutionGraph;
     use crate::state::task_manager::JobInfoCache;
diff --git a/ballista/scheduler/src/cluster/test_util/mod.rs 
b/ballista/scheduler/src/cluster/test_util/mod.rs
index 64da6c60..70269563 100644
--- a/ballista/scheduler/src/cluster/test_util/mod.rs
+++ b/ballista/scheduler/src/cluster/test_util/mod.rs
@@ -20,8 +20,8 @@ use crate::scheduler_server::timestamp_millis;
 use crate::state::execution_graph::ExecutionGraph;
 use crate::test_utils::{await_condition, mock_completed_task, mock_executor};
 use ballista_core::error::Result;
-use ballista_core::serde::protobuf::job_status::Status;
 use ballista_core::serde::protobuf::JobStatus;
+use ballista_core::serde::protobuf::job_status::Status;
 use futures::StreamExt;
 use std::sync::Arc;
 use std::time::Duration;
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 559f1d67..9b055340 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -25,9 +25,9 @@
 //
 #![allow(clippy::uninlined_format_args)]
 
-use crate::cluster::DistributionPolicy;
 use crate::SessionBuilder;
-use ballista_core::{config::TaskSchedulingPolicy, ConfigProducer};
+use crate::cluster::DistributionPolicy;
+use ballista_core::{ConfigProducer, config::TaskSchedulingPolicy};
 use datafusion_proto::logical_plan::LogicalExtensionCodec;
 use datafusion_proto::physical_plan::PhysicalExtensionCodec;
 use std::fmt::Display;
diff --git a/ballista/scheduler/src/display.rs 
b/ballista/scheduler/src/display.rs
index 5cdd8802..75f85a6f 100644
--- a/ballista/scheduler/src/display.rs
+++ b/ballista/scheduler/src/display.rs
@@ -23,7 +23,7 @@ use ballista_core::utils::collect_plan_metrics;
 use datafusion::logical_expr::{StringifiedPlan, ToStringifiedPlan};
 use datafusion::physical_plan::metrics::MetricsSet;
 use datafusion::physical_plan::{
-    accept, DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor,
+    DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor, accept,
 };
 use log::{error, info};
 use std::fmt;
diff --git a/ballista/scheduler/src/metrics/prometheus.rs 
b/ballista/scheduler/src/metrics/prometheus.rs
index 9c26145b..032f0f82 100644
--- a/ballista/scheduler/src/metrics/prometheus.rs
+++ b/ballista/scheduler/src/metrics/prometheus.rs
@@ -20,8 +20,8 @@ use ballista_core::error::{BallistaError, Result};
 
 use once_cell::sync::OnceCell;
 use prometheus::{
-    register_counter_with_registry, register_gauge_with_registry,
-    register_histogram_with_registry, Counter, Gauge, Histogram, Registry,
+    Counter, Gauge, Histogram, Registry, register_counter_with_registry,
+    register_gauge_with_registry, register_histogram_with_registry,
 };
 use prometheus::{Encoder, TextEncoder};
 use std::sync::Arc;
diff --git a/ballista/scheduler/src/physical_optimizer/join_selection.rs 
b/ballista/scheduler/src/physical_optimizer/join_selection.rs
index 3fb4d60b..7873ac3e 100644
--- a/ballista/scheduler/src/physical_optimizer/join_selection.rs
+++ b/ballista/scheduler/src/physical_optimizer/join_selection.rs
@@ -36,10 +36,10 @@ use datafusion::common::config::ConfigOptions;
 use datafusion::common::error::Result;
 
 use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion::common::{internal_err, JoinSide, JoinType};
+use datafusion::common::{JoinSide, JoinType, internal_err};
 use datafusion::logical_expr::sort_properties::SortProperties;
-use datafusion::physical_expr::expressions::Column;
 use datafusion::physical_expr::LexOrdering;
+use datafusion::physical_expr::expressions::Column;
 use datafusion::physical_optimizer::PhysicalOptimizerRule;
 use datafusion::physical_plan::execution_plan::EmissionType;
 use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
@@ -544,19 +544,15 @@ pub fn hash_join_swap_subrule(
     mut input: Arc<dyn ExecutionPlan>,
     _config_options: &ConfigOptions,
 ) -> Result<Arc<dyn ExecutionPlan>> {
-    if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
-        if hash_join.left.boundedness().is_unbounded()
-            && !hash_join.right.boundedness().is_unbounded()
-            && matches!(
-                *hash_join.join_type(),
-                JoinType::Inner
-                    | JoinType::Left
-                    | JoinType::LeftSemi
-                    | JoinType::LeftAnti
-            )
-        {
-            input = swap_join_according_to_unboundedness(hash_join)?;
-        }
+    if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>()
+        && hash_join.left.boundedness().is_unbounded()
+        && !hash_join.right.boundedness().is_unbounded()
+        && matches!(
+            *hash_join.join_type(),
+            JoinType::Inner | JoinType::Left | JoinType::LeftSemi | 
JoinType::LeftAnti
+        )
+    {
+        input = swap_join_according_to_unboundedness(hash_join)?;
     }
     Ok(input)
 }
@@ -612,14 +608,14 @@ mod test {
 
     use datafusion::{
         arrow::datatypes::{DataType, Field, Schema},
-        common::{stats::Precision, ColumnStatistics, JoinSide, JoinType, 
Statistics},
+        common::{ColumnStatistics, JoinSide, JoinType, Statistics, 
stats::Precision},
         config::ConfigOptions,
         logical_expr::Operator,
         physical_plan::{
+            ExecutionPlan,
             expressions::{BinaryExpr, Column},
             joins::utils::{ColumnIndex, JoinFilter},
             test::exec::StatisticsExec,
-            ExecutionPlan,
         },
     };
     //
diff --git a/ballista/scheduler/src/planner.rs 
b/ballista/scheduler/src/planner.rs
index 80fa8440..a185f89b 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -26,13 +26,13 @@ use ballista_core::{
     serde::scheduler::PartitionLocation,
 };
 use datafusion::config::ConfigOptions;
-use datafusion::physical_optimizer::enforce_sorting::EnforceSorting;
 use datafusion::physical_optimizer::PhysicalOptimizerRule;
+use datafusion::physical_optimizer::enforce_sorting::EnforceSorting;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::repartition::RepartitionExec;
 use 
datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use datafusion::physical_plan::{
-    with_new_children_if_necessary, ExecutionPlan, Partitioning,
+    ExecutionPlan, Partitioning, with_new_children_if_necessary,
 };
 
 use log::{debug, info};
@@ -338,7 +338,7 @@ mod test {
     use datafusion::physical_plan::sorts::sort::SortExec;
     use 
datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
     use datafusion::physical_plan::windows::BoundedWindowAggExec;
-    use datafusion::physical_plan::{displayable, ExecutionPlan};
+    use datafusion::physical_plan::{ExecutionPlan, displayable};
     use datafusion::physical_plan::{InputOrderMode, Partitioning};
     use datafusion_proto::physical_plan::AsExecutionPlan;
     use datafusion_proto::protobuf::LogicalPlanNode;
diff --git a/ballista/scheduler/src/scheduler_process.rs 
b/ballista/scheduler/src/scheduler_process.rs
index 4cb4d81c..361ade55 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -15,12 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use ballista_core::BALLISTA_VERSION;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer;
 use ballista_core::serde::{
     BallistaCodec, BallistaLogicalExtensionCodec, 
BallistaPhysicalExtensionCodec,
 };
-use ballista_core::BALLISTA_VERSION;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use datafusion_proto::physical_plan::AsExecutionPlan;
 use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
@@ -35,9 +35,9 @@ use crate::cluster::BallistaCluster;
 use crate::config::SchedulerConfig;
 
 use crate::metrics::default_metrics_collector;
+use crate::scheduler_server::SchedulerServer;
 #[cfg(feature = "keda-scaler")]
 use 
crate::scheduler_server::externalscaler::external_scaler_server::ExternalScalerServer;
-use crate::scheduler_server::SchedulerServer;
 
 /// Creates as initialized scheduler service
 /// without exposing it as a grpc service
diff --git a/ballista/scheduler/src/scheduler_server/external_scaler.rs 
b/ballista/scheduler/src/scheduler_server/external_scaler.rs
index f2feb243..021670e4 100644
--- a/ballista/scheduler/src/scheduler_server/external_scaler.rs
+++ b/ballista/scheduler/src/scheduler_server/external_scaler.rs
@@ -15,11 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::scheduler_server::SchedulerServer;
 use crate::scheduler_server::externalscaler::{
-    external_scaler_server::ExternalScaler, GetMetricSpecResponse, 
GetMetricsRequest,
-    GetMetricsResponse, IsActiveResponse, MetricSpec, MetricValue, 
ScaledObjectRef,
+    GetMetricSpecResponse, GetMetricsRequest, GetMetricsResponse, 
IsActiveResponse,
+    MetricSpec, MetricValue, ScaledObjectRef, 
external_scaler_server::ExternalScaler,
 };
-use crate::scheduler_server::SchedulerServer;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use datafusion_proto::physical_plan::AsExecutionPlan;
 
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 58f53cba..2860ff1b 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -21,14 +21,15 @@ use ballista_core::extension::SessionConfigHelperExt;
 use ballista_core::serde::protobuf::execute_query_params::Query;
 use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
 use ballista_core::serde::protobuf::{
-    execute_query_failure_result, execute_query_result, AvailableTaskSlots,
-    CancelJobParams, CancelJobResult, CleanJobDataParams, CleanJobDataResult,
-    CreateUpdateSessionParams, CreateUpdateSessionResult, 
ExecuteQueryFailureResult,
-    ExecuteQueryParams, ExecuteQueryResult, ExecuteQuerySuccessResult, 
ExecutorHeartbeat,
-    ExecutorStoppedParams, ExecutorStoppedResult, GetJobStatusParams, 
GetJobStatusResult,
-    HeartBeatParams, HeartBeatResult, PollWorkParams, PollWorkResult,
-    RegisterExecutorParams, RegisterExecutorResult, RemoveSessionParams,
-    RemoveSessionResult, UpdateTaskStatusParams, UpdateTaskStatusResult,
+    AvailableTaskSlots, CancelJobParams, CancelJobResult, CleanJobDataParams,
+    CleanJobDataResult, CreateUpdateSessionParams, CreateUpdateSessionResult,
+    ExecuteQueryFailureResult, ExecuteQueryParams, ExecuteQueryResult,
+    ExecuteQuerySuccessResult, ExecutorHeartbeat, ExecutorStoppedParams,
+    ExecutorStoppedResult, GetJobStatusParams, GetJobStatusResult, 
HeartBeatParams,
+    HeartBeatResult, PollWorkParams, PollWorkResult, RegisterExecutorParams,
+    RegisterExecutorResult, RemoveSessionParams, RemoveSessionResult,
+    UpdateTaskStatusParams, UpdateTaskStatusResult, 
execute_query_failure_result,
+    execute_query_result,
 };
 use ballista_core::serde::scheduler::ExecutorMetadata;
 use datafusion_proto::logical_plan::AsLogicalPlan;
@@ -116,16 +117,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
                 TaskDistributionPolicy::RoundRobin => {
                     bind_task_round_robin(available_slots, running_jobs, |_| 
false).await
                 }
-                TaskDistributionPolicy::ConsistentHash{..} => {
+                TaskDistributionPolicy::ConsistentHash { .. } => {
                     return Err(Status::unimplemented(
-                        "ConsistentHash TaskDistribution is not feasible for 
pull-based task scheduling"))
+                        "ConsistentHash TaskDistribution is not feasible for 
pull-based task scheduling",
+                    ));
                 }
 
-                TaskDistributionPolicy::Custom(ref policy) =>{
-                    policy.bind_tasks(available_slots, 
running_jobs).await.map_err(|e| {
-                        Status::internal(e.to_string())
-                    })?
-                }
+                TaskDistributionPolicy::Custom(ref policy) => policy
+                    .bind_tasks(available_slots, running_jobs)
+                    .await
+                    .map_err(|e| Status::internal(e.to_string()))?,
             };
 
             let mut tasks = vec![];
@@ -345,7 +346,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
 
             let job_id = self.state.task_manager.generate_job_id();
 
-            info!("execution query - session_id: {session_id}, operation_id: 
{operation_id}, job_name: {job_name}, job_id: {job_id}");
+            info!(
+                "execution query - session_id: {session_id}, operation_id: 
{operation_id}, job_name: {job_name}, job_id: {job_id}"
+            );
 
             let (session_id, session_ctx) = {
                 let session_config = 
self.state.session_manager.produce_config();
@@ -547,12 +550,12 @@ mod test {
     use crate::config::SchedulerConfig;
     use crate::metrics::default_metrics_collector;
     use ballista_core::error::BallistaError;
+    use ballista_core::serde::BallistaCodec;
     use ballista_core::serde::protobuf::{
-        executor_status, ExecutorRegistration, ExecutorStatus, 
ExecutorStoppedParams,
-        HeartBeatParams, PollWorkParams, RegisterExecutorParams,
+        ExecutorRegistration, ExecutorStatus, ExecutorStoppedParams, 
HeartBeatParams,
+        PollWorkParams, RegisterExecutorParams, executor_status,
     };
     use ballista_core::serde::scheduler::ExecutorSpecification;
-    use ballista_core::serde::BallistaCodec;
 
     use crate::state::SchedulerState;
     use crate::test_utils::await_condition;
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs 
b/ballista/scheduler/src/scheduler_server/mod.rs
index e741601e..8f78c970 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -20,8 +20,8 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
 
 use ballista_core::error::Result;
 use ballista_core::event_loop::{EventLoop, EventSender};
-use ballista_core::serde::protobuf::TaskStatus;
 use ballista_core::serde::BallistaCodec;
+use ballista_core::serde::protobuf::TaskStatus;
 
 use datafusion::execution::context::SessionState;
 use datafusion::logical_expr::LogicalPlan;
@@ -40,8 +40,8 @@ use 
crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
 
 use crate::state::executor_manager::ExecutorManager;
 
-use crate::state::task_manager::TaskLauncher;
 use crate::state::SchedulerState;
+use crate::state::task_manager::TaskLauncher;
 
 // include the generated protobuf source as a submodule
 #[cfg(feature = "keda-scaler")]
@@ -276,8 +276,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
 
                     let stop_reason = if terminating {
                         format!(
-                        "TERMINATING executor {executor_id} heartbeat timed 
out after {}s", state.config.executor_termination_grace_period,
-                    )
+                            "TERMINATING executor {executor_id} heartbeat 
timed out after {}s",
+                            state.config.executor_termination_grace_period,
+                        )
                     } else {
                         format!(
                             "ACTIVE executor {executor_id} heartbeat timed out 
after {}s",
@@ -388,7 +389,7 @@ mod test {
     use ballista_core::extension::SessionConfigExt;
     use datafusion::arrow::datatypes::{DataType, Field, Schema};
     use datafusion::functions_aggregate::sum::sum;
-    use datafusion::logical_expr::{col, LogicalPlan};
+    use datafusion::logical_expr::{LogicalPlan, col};
 
     use datafusion::prelude::SessionConfig;
     use datafusion::test_util::scan_empty_with_partitions;
@@ -400,22 +401,22 @@ mod test {
 
     use crate::config::SchedulerConfig;
 
+    use ballista_core::serde::BallistaCodec;
     use ballista_core::serde::protobuf::{
-        failed_task, job_status, task_status, ExecutionError, FailedTask, 
JobStatus,
-        MultiTaskDefinition, ShuffleWritePartition, SuccessfulJob, 
SuccessfulTask,
-        TaskId, TaskStatus,
+        ExecutionError, FailedTask, JobStatus, MultiTaskDefinition,
+        ShuffleWritePartition, SuccessfulJob, SuccessfulTask, TaskId, 
TaskStatus,
+        failed_task, job_status, task_status,
     };
     use ballista_core::serde::scheduler::{
         ExecutorData, ExecutorMetadata, ExecutorSpecification,
     };
-    use ballista_core::serde::BallistaCodec;
 
-    use crate::scheduler_server::{timestamp_millis, SchedulerServer};
+    use crate::scheduler_server::{SchedulerServer, timestamp_millis};
 
     use crate::test_utils::{
+        ExplodingTableProvider, SchedulerTest, TaskRunnerFn, 
TestMetricsCollector,
         assert_completed_event, assert_failed_event, assert_no_submitted_event,
-        assert_submitted_event, test_cluster_context, ExplodingTableProvider,
-        SchedulerTest, TaskRunnerFn, TestMetricsCollector,
+        assert_submitted_event, test_cluster_context,
     };
 
     #[tokio::test]
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs 
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 2d151c1e..97e7f856 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -279,15 +279,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
             QueryStageSchedulerEvent::ExecutorLost(executor_id, _) => {
                 match 
self.state.task_manager.executor_lost(&executor_id).await {
                     Ok(tasks) => {
-                        if !tasks.is_empty() {
-                            if let Err(e) = self
+                        if !tasks.is_empty()
+                            && let Err(e) = self
                                 .state
                                 .executor_manager
                                 .cancel_running_tasks(tasks)
                                 .await
-                            {
-                                warn!("Fail to cancel running tasks due to 
{e:?}");
-                            }
+                        {
+                            warn!("Fail to cancel running tasks due to {e:?}");
                         }
                     }
                     Err(e) => {
@@ -335,12 +334,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
 #[cfg(test)]
 mod tests {
     use crate::config::SchedulerConfig;
-    use crate::test_utils::{await_condition, SchedulerTest, 
TestMetricsCollector};
+    use crate::test_utils::{SchedulerTest, TestMetricsCollector, 
await_condition};
     use ballista_core::config::TaskSchedulingPolicy;
     use ballista_core::error::Result;
     use datafusion::arrow::datatypes::{DataType, Field, Schema};
     use datafusion::functions_aggregate::sum::sum;
-    use datafusion::logical_expr::{col, LogicalPlan};
+    use datafusion::logical_expr::{LogicalPlan, col};
     use datafusion::test_util::scan_empty_with_partitions;
     use std::sync::Arc;
     use std::time::Duration;
diff --git a/ballista/scheduler/src/standalone.rs 
b/ballista/scheduler/src/standalone.rs
index c4e6295d..cf835a75 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -19,16 +19,16 @@ use crate::cluster::BallistaCluster;
 use crate::config::SchedulerConfig;
 use crate::metrics::default_metrics_collector;
 use crate::scheduler_server::SchedulerServer;
+use ballista_core::ConfigProducer;
 use ballista_core::extension::SessionConfigExt;
 use ballista_core::serde::BallistaCodec;
 use ballista_core::utils::{
-    create_grpc_server, default_config_producer, default_session_builder,
-    GrpcServerConfig,
+    GrpcServerConfig, create_grpc_server, default_config_producer,
+    default_session_builder,
 };
-use ballista_core::ConfigProducer;
 use ballista_core::{
-    error::Result, serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer,
-    BALLISTA_VERSION,
+    BALLISTA_VERSION, error::Result,
+    serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer,
 };
 use datafusion::execution::SessionState;
 use datafusion::prelude::SessionConfig;
diff --git a/ballista/scheduler/src/state/distributed_explain.rs 
b/ballista/scheduler/src/state/distributed_explain.rs
index 8d602a4f..340aa501 100644
--- a/ballista/scheduler/src/state/distributed_explain.rs
+++ b/ballista/scheduler/src/state/distributed_explain.rs
@@ -25,13 +25,13 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema};
 use datafusion::common::{ScalarValue, UnnestOptions};
 use datafusion::logical_expr::{LogicalPlan, PlanType, StringifiedPlan};
 use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::expressions::col;
 use datafusion::physical_plan::expressions::lit;
 use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
 use datafusion::physical_plan::projection::ProjectionExec;
 use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
-use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
 
 use crate::state::execution_graph::ExecutionStage;
diff --git a/ballista/scheduler/src/state/execution_graph.rs 
b/ballista/scheduler/src/state/execution_graph.rs
index ce2eb717..846907ff 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
 use std::time::{SystemTime, UNIX_EPOCH};
 
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
-use datafusion::physical_plan::{accept, ExecutionPlan, ExecutionPlanVisitor};
+use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanVisitor, accept};
 use datafusion::prelude::SessionConfig;
 use log::{debug, error, info, warn};
 
@@ -31,11 +31,11 @@ use ballista_core::error::{BallistaError, Result};
 use ballista_core::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
 use ballista_core::serde::protobuf::failed_task::FailedReason;
 use ballista_core::serde::protobuf::job_status::Status;
-use ballista_core::serde::protobuf::{job_status, FailedJob, 
ShuffleWritePartition};
-use ballista_core::serde::protobuf::{task_status, RunningTask};
+use ballista_core::serde::protobuf::{FailedJob, ShuffleWritePartition, 
job_status};
 use ballista_core::serde::protobuf::{
     FailedTask, JobStatus, ResultLost, RunningJob, SuccessfulJob, TaskStatus,
 };
+use ballista_core::serde::protobuf::{RunningTask, task_status};
 use ballista_core::serde::scheduler::{
     ExecutorMetadata, PartitionId, PartitionLocation, PartitionStats,
 };
@@ -314,8 +314,14 @@ impl ExecutionGraph {
                         let task_stage_attempt_num =
                             task_status.stage_attempt_num as usize;
                         if task_stage_attempt_num < 
running_stage.stage_attempt_num {
-                            warn!("Ignore TaskStatus update with TID {} as 
it's from Stage {}.{} and there is a more recent stage attempt {}.{} running",
-                                    task_status.task_id, stage_id, 
task_stage_attempt_num, stage_id, running_stage.stage_attempt_num);
+                            warn!(
+                                "Ignore TaskStatus update with TID {} as it's 
from Stage {}.{} and there is a more recent stage attempt {}.{} running",
+                                task_status.task_id,
+                                stage_id,
+                                task_stage_attempt_num,
+                                stage_id,
+                                running_stage.stage_attempt_num
+                            );
                             continue;
                         }
                         let partition_id = task_status.clone().partition_id as 
usize;
@@ -359,7 +365,8 @@ impl ExecutionGraph {
 
                                         if !failed_stages.is_empty() {
                                             let error_msg = format!(
-                                                "Stages was marked failed, 
ignore FetchPartitionError from task {task_identity}");
+                                                "Stages was marked failed, 
ignore FetchPartitionError from task {task_identity}"
+                                            );
                                             warn!("{error_msg}");
                                         } else {
                                             // There are different removal 
strategies here.
@@ -385,7 +392,9 @@ impl ExecutionGraph {
                                                     .entry(map_stage_id)
                                                     .or_default();
                                             
missing_inputs.extend(removed_map_partitions);
-                                            warn!("Need to resubmit the 
current running Stage {stage_id} and its map Stage {map_stage_id} due to 
FetchPartitionError from task {task_identity}")
+                                            warn!(
+                                                "Need to resubmit the current 
running Stage {stage_id} and its map Stage {map_stage_id} due to 
FetchPartitionError from task {task_identity}"
+                                            )
                                         }
                                     } else {
                                         let error_msg = format!(
@@ -415,7 +424,10 @@ impl ExecutionGraph {
                                         } else {
                                             let error_msg = format!(
                                                 "Task {} in Stage {} failed {} 
times, fail the stage, most recent failure reason: {:?}",
-                                                partition_id, stage_id, 
max_task_failures, failed_task.error
+                                                partition_id,
+                                                stage_id,
+                                                max_task_failures,
+                                                failed_task.error
                                             );
                                             error!("{error_msg}");
                                             failed_stages.insert(stage_id, 
error_msg);
@@ -428,7 +440,8 @@ impl ExecutionGraph {
                                 }
                                 None => {
                                     let error_msg = format!(
-                                        "Task {partition_id} in Stage 
{stage_id} failed with unknown failure reasons, fail the stage");
+                                        "Task {partition_id} in Stage 
{stage_id} failed with unknown failure reasons, fail the stage"
+                                    );
                                     error!("{error_msg}");
                                     failed_stages.insert(stage_id, error_msg);
                                 }
@@ -499,68 +512,64 @@ impl ExecutionGraph {
                         // handle delayed failed tasks if the stage's next 
attempt is still in UnResolved status.
                         if let Some(task_status::Status::Failed(failed_task)) =
                             task_status.status
-                        {
-                            if unsolved_stage.stage_attempt_num - 
task_stage_attempt_num
+                            && unsolved_stage.stage_attempt_num - 
task_stage_attempt_num
                                 == 1
-                            {
-                                let failed_reason = failed_task.failed_reason;
-                                match failed_reason {
-                                    Some(FailedReason::ExecutionError(_)) => {
-                                        should_ignore = false;
-                                        failed_stages.insert(stage_id, 
failed_task.error);
+                        {
+                            let failed_reason = failed_task.failed_reason;
+                            match failed_reason {
+                                Some(FailedReason::ExecutionError(_)) => {
+                                    should_ignore = false;
+                                    failed_stages.insert(stage_id, 
failed_task.error);
+                                }
+                                Some(FailedReason::FetchPartitionError(
+                                    fetch_partiton_error,
+                                )) if failed_stages.is_empty()
+                                    && current_running_stages.contains(
+                                        &(fetch_partiton_error.map_stage_id as 
usize),
+                                    )
+                                    && !unsolved_stage
+                                        .last_attempt_failure_reasons
+                                        
.contains(&fetch_partiton_error.executor_id) =>
+                                {
+                                    should_ignore = false;
+                                    unsolved_stage
+                                        .last_attempt_failure_reasons
+                                        
.insert(fetch_partiton_error.executor_id.clone());
+                                    let map_stage_id =
+                                        fetch_partiton_error.map_stage_id as 
usize;
+                                    let map_partition_id =
+                                        fetch_partiton_error.map_partition_id 
as usize;
+                                    let executor_id = 
fetch_partiton_error.executor_id;
+                                    let removed_map_partitions = unsolved_stage
+                                        .remove_input_partitions(
+                                            map_stage_id,
+                                            map_partition_id,
+                                            &executor_id,
+                                        )?;
+
+                                    let missing_inputs = reset_running_stages
+                                        .entry(map_stage_id)
+                                        .or_default();
+                                    
missing_inputs.extend(removed_map_partitions);
+                                    warn!(
+                                        "Need to reset the current running 
Stage {map_stage_id} due to late come FetchPartitionError from its parent stage 
{stage_id} of task {task_identity}"
+                                    );
+
+                                    // If the previous other task updates had 
already mark the map stage success, need to remove it.
+                                    if 
successful_stages.contains(&map_stage_id) {
+                                        
successful_stages.remove(&map_stage_id);
                                     }
-                                    Some(FailedReason::FetchPartitionError(
-                                        fetch_partiton_error,
-                                    )) if failed_stages.is_empty()
-                                        && current_running_stages.contains(
-                                            
&(fetch_partiton_error.map_stage_id as usize),
-                                        )
-                                        && !unsolved_stage
-                                            .last_attempt_failure_reasons
-                                            .contains(
-                                                
&fetch_partiton_error.executor_id,
-                                            ) =>
-                                    {
-                                        should_ignore = false;
-                                        unsolved_stage
-                                            .last_attempt_failure_reasons
-                                            .insert(
-                                                
fetch_partiton_error.executor_id.clone(),
-                                            );
-                                        let map_stage_id =
-                                            fetch_partiton_error.map_stage_id 
as usize;
-                                        let map_partition_id = 
fetch_partiton_error
-                                            .map_partition_id
-                                            as usize;
-                                        let executor_id =
-                                            fetch_partiton_error.executor_id;
-                                        let removed_map_partitions = 
unsolved_stage
-                                            .remove_input_partitions(
-                                                map_stage_id,
-                                                map_partition_id,
-                                                &executor_id,
-                                            )?;
-
-                                        let missing_inputs = 
reset_running_stages
-                                            .entry(map_stage_id)
-                                            .or_default();
-                                        
missing_inputs.extend(removed_map_partitions);
-                                        warn!("Need to reset the current 
running Stage {map_stage_id} due to late come FetchPartitionError from its 
parent stage {stage_id} of task {task_identity}");
-
-                                        // If the previous other task updates 
had already mark the map stage success, need to remove it.
-                                        if 
successful_stages.contains(&map_stage_id) {
-                                            
successful_stages.remove(&map_stage_id);
-                                        }
-                                        if resolved_stages.contains(&stage_id) 
{
-                                            resolved_stages.remove(&stage_id);
-                                        }
+                                    if resolved_stages.contains(&stage_id) {
+                                        resolved_stages.remove(&stage_id);
                                     }
-                                    _ => {}
                                 }
+                                _ => {}
                             }
                         }
                         if should_ignore {
-                            warn!("Ignore TaskStatus update of task with TID 
{task_identity} as the Stage {job_id}/{stage_id} is in UnResolved status");
+                            warn!(
+                                "Ignore TaskStatus update of task with TID 
{task_identity} as the Stage {job_id}/{stage_id} is in UnResolved status"
+                            );
                         }
                     }
                 } else {
@@ -568,7 +577,10 @@ impl ExecutionGraph {
                         "Stage {}/{} is not in running when updating the 
status of tasks {:?}",
                         job_id,
                         stage_id,
-                        stage_task_statuses.into_iter().map(|task_status| 
task_status.partition_id).collect::<Vec<_>>(),
+                        stage_task_statuses
+                            .into_iter()
+                            .map(|task_status| task_status.partition_id)
+                            .collect::<Vec<_>>(),
                     );
                 }
             } else {
@@ -605,7 +617,8 @@ impl ExecutionGraph {
                     }
                 } else {
                     warn!(
-                        "Stage {job_id}/{stage_id} is not in Successful state 
when try to resubmit this stage. ");
+                        "Stage {job_id}/{stage_id} is not in Successful state 
when try to resubmit this stage. "
+                    );
                 }
             } else {
                 return Err(BallistaError::Internal(format!(
@@ -628,7 +641,8 @@ impl ExecutionGraph {
                     }
                 } else {
                     warn!(
-                        "Stage {job_id}/{stage_id} is not in Running state 
when try to reset the running task. ");
+                        "Stage {job_id}/{stage_id} is not in Running state 
when try to reset the running task. "
+                    );
                 }
             } else {
                 return Err(BallistaError::Internal(format!(
@@ -1324,8 +1338,15 @@ impl Debug for ExecutionGraph {
             .map(|stage| format!("{stage:?}"))
             .collect::<Vec<String>>()
             .join("");
-        write!(f, "ExecutionGraph[job_id={}, session_id={}, 
available_tasks={}, is_successful={}]\n{}",
-               self.job_id, self.session_id, self.available_tasks(), 
self.is_successful(), stages)
+        write!(
+            f,
+            "ExecutionGraph[job_id={}, session_id={}, available_tasks={}, 
is_successful={}]\n{}",
+            self.job_id,
+            self.session_id,
+            self.available_tasks(),
+            self.is_successful(),
+            stages
+        )
     }
 }
 
@@ -1531,8 +1552,8 @@ mod test {
     use crate::scheduler_server::event::QueryStageSchedulerEvent;
     use ballista_core::error::Result;
     use ballista_core::serde::protobuf::{
-        self, failed_task, job_status, ExecutionError, FailedTask, 
FetchPartitionError,
-        IoError, JobStatus, TaskKilled,
+        self, ExecutionError, FailedTask, FetchPartitionError, IoError, 
JobStatus,
+        TaskKilled, failed_task, job_status,
     };
 
     use crate::state::execution_graph::ExecutionGraph;
@@ -1887,7 +1908,9 @@ mod test {
         assert_eq!(last_attempt, 3);
 
         let failure_reason = format!("{:?}", agg_graph.status);
-        assert!(failure_reason.contains("Task 1 in Stage 2 failed 4 times, 
fail the stage, most recent failure reason"));
+        assert!(failure_reason.contains(
+            "Task 1 in Stage 2 failed 4 times, fail the stage, most recent 
failure reason"
+        ));
         assert!(failure_reason.contains("IOError"));
         assert!(!agg_graph.is_successful());
 
diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs 
b/ballista/scheduler/src/state/execution_graph_dot.rs
index e95a1b5a..49ef7a0a 100644
--- a/ballista/scheduler/src/state/execution_graph_dot.rs
+++ b/ballista/scheduler/src/state/execution_graph_dot.rs
@@ -219,11 +219,11 @@ fn sanitize(str: &str, max_len: Option<usize>) -> String {
     }
     // truncate after translation because we know we only have ASCII chars at 
this point
     // so the slice is safe (not splitting unicode character bytes)
-    if let Some(limit) = max_len {
-        if sanitized.len() > limit {
-            sanitized.truncate(limit);
-            return sanitized + " ...";
-        }
+    if let Some(limit) = max_len
+        && sanitized.len() > limit
+    {
+        sanitized.truncate(limit);
+        return sanitized + " ...";
     }
     sanitized
 }
diff --git a/ballista/scheduler/src/state/execution_stage.rs 
b/ballista/scheduler/src/state/execution_stage.rs
index 57e53467..e4324249 100644
--- a/ballista/scheduler/src/state/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_stage.rs
@@ -34,10 +34,10 @@ use log::{debug, warn};
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::execution_plans::ShuffleWriterExec;
 use ballista_core::serde::protobuf::failed_task::FailedReason;
-use ballista_core::serde::protobuf::{task_status, RunningTask};
 use ballista_core::serde::protobuf::{
     FailedTask, OperatorMetricsSet, ResultLost, SuccessfulTask, TaskStatus,
 };
+use ballista_core::serde::protobuf::{RunningTask, task_status};
 use ballista_core::serde::scheduler::PartitionLocation;
 
 use crate::display::DisplayableBallistaExecutionPlan;
@@ -302,7 +302,10 @@ impl UnresolvedStage {
                 stage_inputs.add_partition(partition);
             }
         } else {
-            return Err(BallistaError::Internal(format!("Error adding input 
partitions to stage {}, {} is not a valid child stage ID", self.stage_id, 
stage_id)));
+            return Err(BallistaError::Internal(format!(
+                "Error adding input partitions to stage {}, {} is not a valid 
child stage ID",
+                self.stage_id, stage_id
+            )));
         }
 
         Ok(())
@@ -333,7 +336,10 @@ impl UnresolvedStage {
             stage_output.complete = false;
             Ok(bad_map_partitions)
         } else {
-            Err(BallistaError::Internal(format!("Error remove input partition 
for Stage {}, {} is not a valid child stage ID", self.stage_id, 
input_stage_id)))
+            Err(BallistaError::Internal(format!(
+                "Error remove input partition for Stage {}, {} is not a valid 
child stage ID",
+                self.stage_id, input_stage_id
+            )))
         }
     }
 
@@ -623,8 +629,10 @@ impl RunningStage {
         let task_info = self.task_infos[partition_id].as_ref().unwrap();
         let task_id = task_info.task_id;
         if (status.task_id as usize) < task_id {
-            warn!("Ignore TaskStatus update with TID {} because there is more 
recent task attempt with TID {} running for partition {}",
-                status.task_id, task_id, partition_id);
+            warn!(
+                "Ignore TaskStatus update with TID {} because there is more 
recent task attempt with TID {} running for partition {}",
+                status.task_id, task_id, partition_id
+            );
             return false;
         }
         let scheduled_time = task_info.scheduled_time;
@@ -667,8 +675,14 @@ impl RunningStage {
 
         let new_metrics_set = if let Some(combined_metrics) = &mut 
self.stage_metrics {
             if metrics.len() != combined_metrics.len() {
-                return Err(BallistaError::Internal(format!("Error updating 
task metrics to stage {}, task metrics array size {} does not equal \
-                with the stage metrics array size {} for task {}", 
self.stage_id, metrics.len(), combined_metrics.len(), partition)));
+                return Err(BallistaError::Internal(format!(
+                    "Error updating task metrics to stage {}, task metrics 
array size {} does not equal \
+                with the stage metrics array size {} for task {}",
+                    self.stage_id,
+                    metrics.len(),
+                    combined_metrics.len(),
+                    partition
+                )));
             }
             let metrics_values_array = metrics
                 .into_iter()
@@ -776,7 +790,10 @@ impl RunningStage {
             stage_output.complete = false;
             Ok(bad_map_partitions)
         } else {
-            Err(BallistaError::Internal(format!("Error remove input partition 
for Stage {}, {} is not a valid child stage ID", self.stage_id, 
input_stage_id)))
+            Err(BallistaError::Internal(format!(
+                "Error remove input partition for Stage {}, {} is not a valid 
child stage ID",
+                self.stage_id, input_stage_id
+            )))
         }
     }
 }
diff --git a/ballista/scheduler/src/state/executor_manager.rs 
b/ballista/scheduler/src/state/executor_manager.rs
index d5f7297b..26ffa02d 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -30,12 +30,12 @@ use crate::state::task_manager::JobInfoCache;
 use ballista_core::extension::SessionConfigExt;
 use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
 use ballista_core::serde::protobuf::{
-    executor_status, CancelTasksParams, ExecutorHeartbeat, MultiTaskDefinition,
-    RemoveJobDataParams, StopExecutorParams,
+    CancelTasksParams, ExecutorHeartbeat, MultiTaskDefinition, 
RemoveJobDataParams,
+    StopExecutorParams, executor_status,
 };
 use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
 use ballista_core::utils::{
-    create_grpc_client_connection, get_time_before, GrpcClientConfig,
+    GrpcClientConfig, create_grpc_client_connection, get_time_before,
 };
 use dashmap::DashMap;
 use log::{debug, error, info, warn};
@@ -201,8 +201,8 @@ impl ExecutorManager {
                             .await
                         {
                             warn!(
-                                    "Failed to call remove_job_data on 
Executor {executor} due to {err:?}"
-                                )
+                                "Failed to call remove_job_data on Executor 
{executor} due to {err:?}"
+                            )
                         }
                     });
                 } else {
@@ -248,8 +248,7 @@ impl ExecutorManager {
     pub async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> 
Result<()> {
         trace!(
             "save executor metadata {} with {} task slots (pull-based 
registration)",
-            metadata.id,
-            metadata.specification.task_slots
+            metadata.id, metadata.specification.task_slots
         );
         self.cluster_state.save_executor_metadata(metadata).await
     }
diff --git a/ballista/scheduler/src/state/mod.rs 
b/ballista/scheduler/src/state/mod.rs
index 76139272..d6a4ceef 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -40,8 +40,8 @@ use crate::config::SchedulerConfig;
 use crate::state::execution_graph::TaskDescription;
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::event_loop::EventSender;
-use ballista_core::serde::protobuf::TaskStatus;
 use ballista_core::serde::BallistaCodec;
+use ballista_core::serde::protobuf::TaskStatus;
 use datafusion::logical_expr::LogicalPlan;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use datafusion::physical_plan::empty::EmptyExec;
@@ -197,13 +197,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
                     if_revive = true;
                 }
             }
-            if if_revive {
-                if let Err(e) = sender
+            if if_revive
+                && let Err(e) = sender
                     .post_event(QueryStageSchedulerEvent::ReviveOffers)
                     .await
-                {
-                    error!("Fail to send revive offers event due to {e:?}");
-                }
+            {
+                error!("Fail to send revive offers event due to {e:?}");
             }
         });
 
@@ -229,12 +228,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
 
         match self.task_manager.executor_lost(executor_id).await {
             Ok(tasks) => {
-                if !tasks.is_empty() {
-                    if let Err(e) =
+                if !tasks.is_empty()
+                    && let Err(e) =
                         self.executor_manager.cancel_running_tasks(tasks).await
-                    {
-                        warn!("Fail to cancel running tasks due to {e:?}");
-                    }
+                {
+                    warn!("Fail to cancel running tasks due to {e:?}");
                 }
             }
             Err(e) => {
@@ -308,7 +306,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
                         }
                     }
                     Err(e) => {
-                        error!("Failed to launch new task, could not get 
executor metadata: {e}");
+                        error!(
+                            "Failed to launch new task, could not get executor 
metadata: {e}"
+                        );
                         false
                     }
                 };
diff --git a/ballista/scheduler/src/state/session_manager.rs 
b/ballista/scheduler/src/state/session_manager.rs
index 7538aff6..eefa9e35 100644
--- a/ballista/scheduler/src/state/session_manager.rs
+++ b/ballista/scheduler/src/state/session_manager.rs
@@ -61,7 +61,9 @@ pub fn create_datafusion_context(
             // should we disable catalog on the scheduler side
             .with_round_robin_repartition(false);
 
-        log::warn!("session manager will override 
`datafusion.optimizer.enable_round_robin_repartition` to `false` ");
+        log::warn!(
+            "session manager will override 
`datafusion.optimizer.enable_round_robin_repartition` to `false` "
+        );
         session_builder(session_config)?
     } else {
         session_builder(session_config.clone())?
diff --git a/ballista/scheduler/src/state/task_manager.rs 
b/ballista/scheduler/src/state/task_manager.rs
index e49c7b51..66376a7f 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -30,11 +30,11 @@ use datafusion::prelude::SessionConfig;
 use rand::distr::Alphanumeric;
 
 use crate::cluster::JobState;
+use ballista_core::serde::BallistaCodec;
 use ballista_core::serde::protobuf::{
-    job_status, JobStatus, MultiTaskDefinition, TaskDefinition, TaskId, 
TaskStatus,
+    JobStatus, MultiTaskDefinition, TaskDefinition, TaskId, TaskStatus, 
job_status,
 };
 use ballista_core::serde::scheduler::ExecutorMetadata;
-use ballista_core::serde::BallistaCodec;
 use dashmap::DashMap;
 
 use datafusion::physical_plan::ExecutionPlan;
@@ -42,7 +42,7 @@ use datafusion_proto::logical_plan::AsLogicalPlan;
 use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec};
 use datafusion_proto::protobuf::PhysicalPlanNode;
 use log::{debug, error, info, trace, warn};
-use rand::{rng, Rng};
+use rand::{Rng, rng};
 use std::collections::{HashMap, HashSet};
 use std::ops::Deref;
 use std::sync::Arc;
@@ -364,7 +364,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                 )?
             } else {
                 // TODO Deal with curator changed case
-                error!("Fail to find job {job_id} in the active cache and it 
may not be curated by this scheduler");
+                error!(
+                    "Fail to find job {job_id} in the active cache and it may 
not be curated by this scheduler"
+                );
                 vec![]
             };
 
@@ -431,7 +433,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
             (running_tasks, pending_tasks)
         } else {
             // TODO listen the job state update event and fix task cancelling
-            warn!("Fail to find job {job_id} in the cache, unable to cancel 
tasks for job, fail the job state only.");
+            warn!(
+                "Fail to find job {job_id} in the cache, unable to cancel 
tasks for job, fail the job state only."
+            );
             (vec![], 0)
         };
 
@@ -587,7 +591,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                     .iter()
                     .map(|task| task.partition.partition_id)
                     .collect();
-                debug!("Preparing multi task definition for tasks {task_ids:?} 
belonging to job stage {job_id}/{stage_id}");
+                debug!(
+                    "Preparing multi task definition for tasks {task_ids:?} 
belonging to job stage {job_id}/{stage_id}"
+                );
                 trace!("With task details {tasks:?}");
             }
 
@@ -626,7 +632,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
 
                 Ok(multi_tasks)
             } else {
-                Err(BallistaError::General(format!("Cannot prepare multi task 
definition for job {job_id} which is not in active cache")))
+                Err(BallistaError::General(format!(
+                    "Cannot prepare multi task definition for job {job_id} 
which is not in active cache"
+                )))
             }
         } else {
             Err(BallistaError::General(
@@ -669,7 +677,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
     /// Clean up a failed job in FailedJobs Keyspace by delayed 
clean_up_interval seconds
     pub(crate) fn clean_up_job_delayed(&self, job_id: String, 
clean_up_interval: u64) {
         if clean_up_interval == 0 {
-            info!("The interval is 0 and the clean up for the failed job state 
{job_id} will not triggered");
+            info!(
+                "The interval is 0 and the clean up for the failed job state 
{job_id} will not triggered"
+            );
             return;
         }
 
diff --git a/ballista/scheduler/src/test_utils.rs 
b/ballista/scheduler/src/test_utils.rs
index 5014a7e5..e30ec4d1 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -29,29 +29,29 @@ use async_trait::async_trait;
 use crate::config::SchedulerConfig;
 use crate::metrics::SchedulerMetricsCollector;
 use crate::planner::DefaultDistributedPlanner;
-use crate::scheduler_server::{timestamp_millis, SchedulerServer};
+use crate::scheduler_server::{SchedulerServer, timestamp_millis};
 
 use crate::state::executor_manager::ExecutorManager;
 use crate::state::task_manager::TaskLauncher;
 
 use ballista_core::serde::protobuf::job_status::Status;
 use ballista_core::serde::protobuf::{
-    task_status, FailedTask, JobStatus, MultiTaskDefinition, 
ShuffleWritePartition,
-    SuccessfulTask, TaskId, TaskStatus,
+    FailedTask, JobStatus, MultiTaskDefinition, ShuffleWritePartition, 
SuccessfulTask,
+    TaskId, TaskStatus, task_status,
 };
 use ballista_core::serde::scheduler::{
     ExecutorData, ExecutorMetadata, ExecutorSpecification,
 };
-use ballista_core::serde::{protobuf, BallistaCodec};
+use ballista_core::serde::{BallistaCodec, protobuf};
 use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion::common::DataFusionError;
 use datafusion::datasource::{TableProvider, TableType};
 use datafusion::execution::context::{SessionConfig, SessionContext};
 use datafusion::functions_aggregate::{count::count, sum::sum};
 use datafusion::logical_expr::{Expr, LogicalPlan, SortExpr};
-use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::{col, CsvReadOptions, JoinType};
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::prelude::{CsvReadOptions, JoinType, col};
 use datafusion::test_util::scan_empty_with_partitions;
 
 use crate::cluster::BallistaCluster;
@@ -61,7 +61,7 @@ use crate::state::execution_graph::{ExecutionGraph, 
ExecutionStage, TaskDescript
 use ballista_core::utils::{default_config_producer, default_session_builder};
 use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
 use parking_lot::Mutex;
-use tokio::sync::mpsc::{channel, Receiver, Sender};
+use tokio::sync::mpsc::{Receiver, Sender, channel};
 
 pub const TPCH_TABLES: &[&str] = &[
     "part", "supplier", "partsupp", "customer", "orders", "lineitem", 
"nation", "region",
@@ -552,7 +552,7 @@ impl SchedulerTest {
             {
                 match inner {
                     Status::Failed(_) | Status::Successful(_) => {
-                        break Ok(status.unwrap())
+                        break Ok(status.unwrap());
                     }
                     _ => {
                         if time >= timeout_ms {
@@ -587,7 +587,7 @@ impl SchedulerTest {
             {
                 match inner {
                     Status::Failed(_) | Status::Successful(_) => {
-                        break Ok(status.unwrap())
+                        break Ok(status.unwrap());
                     }
                     _ => continue,
                 }
@@ -641,7 +641,7 @@ impl SchedulerTest {
             {
                 match inner {
                     Status::Failed(_) | Status::Successful(_) => {
-                        break Ok(status.unwrap())
+                        break Ok(status.unwrap());
                     }
                     _ => continue,
                 }
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 9dbf4690..ee6502aa 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -19,7 +19,7 @@
 name = "ballista-benchmarks"
 description = "Ballista Benchmarks"
 version = "50.0.0"
-edition = "2021"
+edition = "2024"
 authors = ["Apache DataFusion <[email protected]>"]
 homepage = "https://datafusion.apache.org/ballista/";
 repository = "https://github.com/apache/datafusion-ballista";
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 86a4187e..d1401a65 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -26,19 +26,19 @@ use datafusion::common::{DEFAULT_CSV_EXTENSION, 
DEFAULT_PARQUET_EXTENSION};
 use datafusion::datasource::listing::ListingTableUrl;
 use datafusion::datasource::{MemTable, TableProvider};
 use datafusion::error::{DataFusionError, Result};
-use datafusion::execution::context::SessionState;
 use datafusion::execution::SessionStateBuilder;
+use datafusion::execution::context::SessionState;
 use datafusion::logical_expr::LogicalPlan;
-use datafusion::logical_expr::{expr::Cast, Expr};
+use datafusion::logical_expr::{Expr, expr::Cast};
 use datafusion::parquet::basic::Compression;
 use datafusion::parquet::file::properties::WriterProperties;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use datafusion::physical_plan::{collect, displayable};
 use datafusion::prelude::*;
 use datafusion::{
-    arrow::datatypes::{DataType, Field, Schema},
-    datasource::file_format::{csv::CsvFormat, FileFormat},
     DATAFUSION_VERSION,
+    arrow::datatypes::{DataType, Field, Schema},
+    datasource::file_format::{FileFormat, csv::CsvFormat},
 };
 use datafusion::{
     arrow::record_batch::RecordBatch, 
datasource::file_format::parquet::ParquetFormat,
@@ -609,7 +609,7 @@ async fn register_tables(
             other => {
                 return Err(DataFusionError::Plan(format!(
                     "Invalid file format '{other}'"
-                )))
+                )));
             }
         }
     }
@@ -646,7 +646,7 @@ fn get_query_sql(query: usize) -> Result<Vec<String>> {
                         .map(|s| s.trim())
                         .filter(|s| !s.is_empty())
                         .map(|s| s.to_string())
-                        .collect())
+                        .collect());
                 }
                 Err(e) => errors.push(format!("{filename}: {e}")),
             };
@@ -771,7 +771,7 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
                     other => {
                         return Err(DataFusionError::NotImplemented(format!(
                             "Invalid compression format: {other}"
-                        )))
+                        )));
                     }
                 };
                 let props = WriterProperties::builder()
@@ -782,7 +782,7 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
             other => {
                 return Err(DataFusionError::NotImplemented(format!(
                     "Invalid output format: {other}"
-                )))
+                )));
             }
         }
         println!("Conversion completed in {} ms", start.elapsed().as_millis());
diff --git a/dev/msrvcheck/Cargo.toml b/dev/msrvcheck/Cargo.toml
index 9ab2e388..a7955f06 100644
--- a/dev/msrvcheck/Cargo.toml
+++ b/dev/msrvcheck/Cargo.toml
@@ -18,7 +18,7 @@
 # MSRV checker for upstream DataFusion
 [package]
 name = "msrvcheck"
-edition = "2021"
+edition = "2024"
 
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 
diff --git a/dev/msrvcheck/src/main.rs b/dev/msrvcheck/src/main.rs
index 1773a7c9..0a15ddbe 100644
--- a/dev/msrvcheck/src/main.rs
+++ b/dev/msrvcheck/src/main.rs
@@ -56,7 +56,7 @@ fn main() -> CargoResult<()> {
 
     for (package, version) in packages_with_rust_version {
         if let Some(v) = version {
-            if !v.is_compatible_with(project_msrv.as_partial()) {
+            if !project_msrv.is_compatible_with(v.as_partial()) {
                 panic!(
                     "package '{package}' has MSRV {v} not compatible with 
current project MSRV {project_msrv}",
                 );
diff --git a/examples/examples/custom-executor.rs 
b/examples/examples/custom-executor.rs
index 6e0e6c69..1f07bd76 100644
--- a/examples/examples/custom-executor.rs
+++ b/examples/examples/custom-executor.rs
@@ -20,7 +20,7 @@ use ballista_core::object_store::{
 };
 
 use ballista_executor::executor_process::{
-    start_executor_process, ExecutorProcessConfig,
+    ExecutorProcessConfig, start_executor_process,
 };
 use std::sync::Arc;
 ///
diff --git a/examples/examples/remote-dataframe.rs 
b/examples/examples/remote-dataframe.rs
index 53fb4adf..2837f0a8 100644
--- a/examples/examples/remote-dataframe.rs
+++ b/examples/examples/remote-dataframe.rs
@@ -20,7 +20,7 @@ use ballista_examples::test_util;
 use datafusion::{
     common::Result,
     execution::SessionStateBuilder,
-    prelude::{col, lit, ParquetReadOptions, SessionConfig, SessionContext},
+    prelude::{ParquetReadOptions, SessionConfig, SessionContext, col, lit},
 };
 
 /// This example demonstrates executing a simple query against an Arrow data 
source (Parquet) and
diff --git a/examples/examples/standalone-sql.rs 
b/examples/examples/standalone-sql.rs
index 6c957668..53cdf208 100644
--- a/examples/examples/standalone-sql.rs
+++ b/examples/examples/standalone-sql.rs
@@ -17,7 +17,7 @@
 
 use ballista::datafusion::{
     common::Result,
-    execution::{options::ParquetReadOptions, SessionStateBuilder},
+    execution::{SessionStateBuilder, options::ParquetReadOptions},
     prelude::{SessionConfig, SessionContext},
 };
 use ballista::prelude::{SessionConfigExt, SessionContextExt};
diff --git a/examples/tests/common/mod.rs b/examples/tests/common/mod.rs
index e9aabb58..2c2f65ca 100644
--- a/examples/tests/common/mod.rs
+++ b/examples/tests/common/mod.rs
@@ -17,7 +17,7 @@
 
 use ballista::prelude::SessionConfigExt;
 use ballista_core::serde::{
-    protobuf::scheduler_grpc_client::SchedulerGrpcClient, BallistaCodec,
+    BallistaCodec, protobuf::scheduler_grpc_client::SchedulerGrpcClient,
 };
 use ballista_core::{ConfigProducer, RuntimeProducer};
 use ballista_scheduler::SessionBuilder;
@@ -25,8 +25,8 @@ use datafusion::execution::SessionState;
 use datafusion::prelude::SessionConfig;
 use object_store::aws::AmazonS3Builder;
 use testcontainers_modules::minio::MinIO;
-use testcontainers_modules::testcontainers::core::{CmdWaitFor, ExecCommand};
 use testcontainers_modules::testcontainers::ContainerRequest;
+use testcontainers_modules::testcontainers::core::{CmdWaitFor, ExecCommand};
 use testcontainers_modules::{minio, testcontainers::ImageExt};
 
 pub const REGION: &str = "eu-west-1";
diff --git a/examples/tests/object_store.rs b/examples/tests/object_store.rs
index b45a08ee..b79712ea 100644
--- a/examples/tests/object_store.rs
+++ b/examples/tests/object_store.rs
@@ -221,11 +221,11 @@ mod custom_s3_config {
     use crate::common::{ACCESS_KEY_ID, SECRET_KEY};
     use ballista::extension::SessionContextExt;
     use ballista::prelude::SessionConfigExt;
-    use ballista_core::object_store::{CustomObjectStoreRegistry, S3Options};
     use ballista_core::RuntimeProducer;
+    use ballista_core::object_store::{CustomObjectStoreRegistry, S3Options};
     use ballista_examples::test_util::examples_test_data;
-    use datafusion::execution::runtime_env::RuntimeEnvBuilder;
     use datafusion::execution::SessionState;
+    use datafusion::execution::runtime_env::RuntimeEnvBuilder;
     use datafusion::prelude::SessionConfig;
     use datafusion::{assert_batches_eq, prelude::SessionContext};
     use datafusion::{error::DataFusionError, execution::SessionStateBuilder};
@@ -233,8 +233,8 @@ mod custom_s3_config {
     use testcontainers_modules::testcontainers::runners::AsyncRunner;
 
     #[tokio::test]
-    async fn should_configure_s3_execute_sql_write_remote(
-    ) -> datafusion::error::Result<()> {
+    async fn should_configure_s3_execute_sql_write_remote()
+    -> datafusion::error::Result<()> {
         let test_data = examples_test_data();
 
         //
@@ -378,8 +378,8 @@ mod custom_s3_config {
     // SessionConfig propagation across ballista cluster.
 
     #[tokio::test]
-    async fn should_configure_s3_execute_sql_write_standalone(
-    ) -> datafusion::error::Result<()> {
+    async fn should_configure_s3_execute_sql_write_standalone()
+    -> datafusion::error::Result<()> {
         let test_data = examples_test_data();
 
         //
diff --git a/rustfmt.toml b/rustfmt.toml
index b5292068..3b15ecf6 100644
--- a/rustfmt.toml
+++ b/rustfmt.toml
@@ -15,6 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-edition = "2021"
+edition = "2024"
 max_width = 90
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to