This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 7cc56ab7 feat: update rust edition to 2024 (#1355)
7cc56ab7 is described below
commit 7cc56ab7d8dbbbc974330c48cc91ce3a8c0d58e4
Author: jgrim <[email protected]>
AuthorDate: Fri Jan 2 13:11:22 2026 +0100
feat: update rust edition to 2024 (#1355)
* feat: update rust edition to 2024
* fix: correct msrvcheck
* migrate resolver to v3 (2024 edition default)
---
Cargo.toml | 4 +-
ballista-cli/src/command.rs | 2 +-
ballista-cli/src/exec.rs | 4 +-
ballista-cli/src/main.rs | 2 +-
ballista/client/tests/bugs.rs | 16 +-
ballista/client/tests/common/mod.rs | 2 +-
ballista/client/tests/context_setup.rs | 12 +-
ballista/core/src/client.rs | 6 +-
ballista/core/src/consistent_hash/mod.rs | 18 ++-
ballista/core/src/diagram.rs | 4 +-
ballista/core/src/event_loop.rs | 2 +-
.../core/src/execution_plans/distributed_query.rs | 8 +-
.../core/src/execution_plans/shuffle_reader.rs | 23 +--
.../core/src/execution_plans/shuffle_writer.rs | 6 +-
ballista/core/src/extension.rs | 11 +-
ballista/core/src/object_store.rs | 11 +-
ballista/core/src/planner.rs | 4 +-
ballista/core/src/serde/mod.rs | 8 +-
ballista/core/src/serde/scheduler/from_proto.rs | 34 ++---
ballista/core/src/serde/scheduler/to_proto.rs | 2 +-
ballista/core/src/utils.rs | 4 +-
ballista/executor/src/bin/main.rs | 2 +-
ballista/executor/src/collect.rs | 2 +-
ballista/executor/src/execution_engine.rs | 2 +-
ballista/executor/src/execution_loop.rs | 12 +-
ballista/executor/src/executor.rs | 6 +-
ballista/executor/src/executor_process.rs | 24 +--
ballista/executor/src/executor_server.rs | 22 +--
ballista/executor/src/flight_service.rs | 6 +-
ballista/executor/src/lib.rs | 4 +-
ballista/executor/src/standalone.rs | 8 +-
ballista/scheduler/src/api/handlers.rs | 8 +-
ballista/scheduler/src/api/mod.rs | 2 +-
ballista/scheduler/src/cluster/event/mod.rs | 4 +-
ballista/scheduler/src/cluster/memory.rs | 23 +--
ballista/scheduler/src/cluster/mod.rs | 16 +-
ballista/scheduler/src/cluster/test_util/mod.rs | 2 +-
ballista/scheduler/src/config.rs | 4 +-
ballista/scheduler/src/display.rs | 2 +-
ballista/scheduler/src/metrics/prometheus.rs | 4 +-
.../src/physical_optimizer/join_selection.rs | 30 ++--
ballista/scheduler/src/planner.rs | 6 +-
ballista/scheduler/src/scheduler_process.rs | 4 +-
.../src/scheduler_server/external_scaler.rs | 6 +-
ballista/scheduler/src/scheduler_server/grpc.rs | 41 ++---
ballista/scheduler/src/scheduler_server/mod.rs | 25 +--
.../src/scheduler_server/query_stage_scheduler.rs | 13 +-
ballista/scheduler/src/standalone.rs | 10 +-
.../scheduler/src/state/distributed_explain.rs | 2 +-
ballista/scheduler/src/state/execution_graph.rs | 167 ++++++++++++---------
.../scheduler/src/state/execution_graph_dot.rs | 10 +-
ballista/scheduler/src/state/execution_stage.rs | 33 +++-
ballista/scheduler/src/state/executor_manager.rs | 13 +-
ballista/scheduler/src/state/mod.rs | 24 +--
ballista/scheduler/src/state/session_manager.rs | 4 +-
ballista/scheduler/src/state/task_manager.rs | 26 +++-
ballista/scheduler/src/test_utils.rs | 20 +--
benchmarks/Cargo.toml | 2 +-
benchmarks/src/bin/tpch.rs | 16 +-
dev/msrvcheck/Cargo.toml | 2 +-
dev/msrvcheck/src/main.rs | 2 +-
examples/examples/custom-executor.rs | 2 +-
examples/examples/remote-dataframe.rs | 2 +-
examples/examples/standalone-sql.rs | 2 +-
examples/tests/common/mod.rs | 4 +-
examples/tests/object_store.rs | 12 +-
rustfmt.toml | 2 +-
67 files changed, 440 insertions(+), 376 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index e9dc469c..a7c4574c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,14 +18,14 @@
[workspace]
exclude = ["dev/msrvcheck", "python"]
members = ["ballista-cli", "ballista/client", "ballista/core",
"ballista/executor", "ballista/scheduler", "benchmarks", "examples"]
-resolver = "2"
+resolver = "3"
[workspace.package]
# edition to be changed to 2024 when we update
# Minimum Supported Rust Version (MSRV) to 1.85.0
# which is datafusion 49
#
-edition = "2021"
+edition = "2024"
# we should try to follow datafusion version
rust-version = "1.88.0"
diff --git a/ballista-cli/src/command.rs b/ballista-cli/src/command.rs
index ebd4c787..95ed32ce 100644
--- a/ballista-cli/src/command.rs
+++ b/ballista-cli/src/command.rs
@@ -28,7 +28,7 @@ use datafusion::common::Result;
use datafusion::error::DataFusionError;
use datafusion::prelude::SessionContext;
-use crate::functions::{display_all_functions, Function};
+use crate::functions::{Function, display_all_functions};
use crate::print_format::PrintFormat;
use crate::print_options::PrintOptions;
diff --git a/ballista-cli/src/exec.rs b/ballista-cli/src/exec.rs
index efbd94fa..887728f6 100644
--- a/ballista-cli/src/exec.rs
+++ b/ballista-cli/src/exec.rs
@@ -18,15 +18,15 @@
//! Execution functions
use std::fs::File;
-use std::io::prelude::*;
use std::io::BufReader;
+use std::io::prelude::*;
use std::sync::Arc;
use std::time::Instant;
use datafusion::common::Result;
use datafusion::prelude::SessionContext;
-use rustyline::error::ReadlineError;
use rustyline::Editor;
+use rustyline::error::ReadlineError;
use crate::{
command::{Command, OutputFormat},
diff --git a/ballista-cli/src/main.rs b/ballista-cli/src/main.rs
index 098d23a4..2a6c691d 100644
--- a/ballista-cli/src/main.rs
+++ b/ballista-cli/src/main.rs
@@ -20,7 +20,7 @@ use std::{env, sync::Arc};
use ballista::{extension::SessionConfigExt, prelude::SessionContextExt};
use ballista_cli::{
- exec, print_format::PrintFormat, print_options::PrintOptions,
BALLISTA_CLI_VERSION,
+ BALLISTA_CLI_VERSION, exec, print_format::PrintFormat,
print_options::PrintOptions,
};
use clap::Parser;
use datafusion::{
diff --git a/ballista/client/tests/bugs.rs b/ballista/client/tests/bugs.rs
index 02b21da2..0c42fefe 100644
--- a/ballista/client/tests/bugs.rs
+++ b/ballista/client/tests/bugs.rs
@@ -130,14 +130,14 @@ order by sum_sales - avg_monthly_sales, s_store_name
let result = ctx.sql(query).await?.collect().await?;
let expected = [
-
"+-------------+-----------+----------------+----------------+--------+-------+--------------------+-----------+---------+---------+",
- "| i_category | i_brand | s_store_name | s_company_name | d_year |
d_moy | avg_monthly_sales | sum_sales | psum | nsum |",
-
"+-------------+-----------+----------------+----------------+--------+-------+--------------------+-----------+---------+---------+",
- "| Electronics | TechBrand | Downtown Store | Retail Corp | 1999 | 4
| 1499.9850000000001 | 999.99 | 999.99 | 999.99 |",
- "| Electronics | TechBrand | Downtown Store | Retail Corp | 1999 | 3
| 1499.9850000000001 | 999.99 | 1999.98 | 999.99 |",
- "| Electronics | TechBrand | Downtown Store | Retail Corp | 1999 | 1
| 1499.9850000000001 | 1999.98 | 1999.98 | 1999.98 |",
- "| Electronics | TechBrand | Downtown Store | Retail Corp | 1999 | 2
| 1499.9850000000001 | 1999.98 | 1999.98 | 999.99 |",
-
"+-------------+-----------+----------------+----------------+--------+-------+--------------------+-----------+---------+---------+",
+
"+-------------+-----------+----------------+----------------+--------+-------+--------------------+-----------+---------+---------+",
+ "| i_category | i_brand | s_store_name | s_company_name |
d_year | d_moy | avg_monthly_sales | sum_sales | psum | nsum |",
+
"+-------------+-----------+----------------+----------------+--------+-------+--------------------+-----------+---------+---------+",
+ "| Electronics | TechBrand | Downtown Store | Retail Corp |
1999 | 4 | 1499.9850000000001 | 999.99 | 999.99 | 999.99 |",
+ "| Electronics | TechBrand | Downtown Store | Retail Corp |
1999 | 3 | 1499.9850000000001 | 999.99 | 1999.98 | 999.99 |",
+ "| Electronics | TechBrand | Downtown Store | Retail Corp |
1999 | 1 | 1499.9850000000001 | 1999.98 | 1999.98 | 1999.98 |",
+ "| Electronics | TechBrand | Downtown Store | Retail Corp |
1999 | 2 | 1499.9850000000001 | 1999.98 | 1999.98 | 999.99 |",
+
"+-------------+-----------+----------------+----------------+--------+-------+--------------------+-----------+---------+---------+",
];
assert_batches_eq!(expected, &result);
diff --git a/ballista/client/tests/common/mod.rs
b/ballista/client/tests/common/mod.rs
index 85cb3fbc..9d940be6 100644
--- a/ballista/client/tests/common/mod.rs
+++ b/ballista/client/tests/common/mod.rs
@@ -21,7 +21,7 @@ use std::path::PathBuf;
use ballista::prelude::{SessionConfigExt, SessionContextExt};
use ballista_core::serde::{
- protobuf::scheduler_grpc_client::SchedulerGrpcClient, BallistaCodec,
+ BallistaCodec, protobuf::scheduler_grpc_client::SchedulerGrpcClient,
};
use ballista_core::{ConfigProducer, RuntimeProducer};
use ballista_scheduler::SessionBuilder;
diff --git a/ballista/client/tests/context_setup.rs
b/ballista/client/tests/context_setup.rs
index e1a49bb0..df999c06 100644
--- a/ballista/client/tests/context_setup.rs
+++ b/ballista/client/tests/context_setup.rs
@@ -103,7 +103,7 @@ mod remote {
#[cfg(feature = "standalone")]
mod standalone {
- use std::sync::{atomic::AtomicBool, Arc};
+ use std::sync::{Arc, atomic::AtomicBool};
use ballista::extension::{SessionConfigExt, SessionContextExt};
use ballista_core::serde::BallistaPhysicalExtensionCodec;
@@ -111,7 +111,7 @@ mod standalone {
assert_batches_eq,
common::exec_err,
execution::{
- context::QueryPlanner, SessionState, SessionStateBuilder,
TaskContext,
+ SessionState, SessionStateBuilder, TaskContext,
context::QueryPlanner,
},
logical_expr::LogicalPlan,
physical_plan::ExecutionPlan,
@@ -226,9 +226,11 @@ mod standalone {
.collect()
.await;
- assert!(physical_codec
- .invoked
- .load(std::sync::atomic::Ordering::Relaxed));
+ assert!(
+ physical_codec
+ .invoked
+ .load(std::sync::atomic::Ordering::Relaxed)
+ );
Ok(())
}
diff --git a/ballista/core/src/client.rs b/ballista/core/src/client.rs
index 1b354346..bc3659d2 100644
--- a/ballista/core/src/client.rs
+++ b/ballista/core/src/client.rs
@@ -29,9 +29,9 @@ use crate::error::{BallistaError, Result as BResult};
use crate::serde::scheduler::{Action, PartitionId};
use arrow_flight;
-use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::Ticket;
-use arrow_flight::{flight_service_client::FlightServiceClient, FlightData};
+use arrow_flight::utils::flight_data_to_arrow_batch;
+use arrow_flight::{FlightData, flight_service_client::FlightServiceClient};
use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::buffer::{Buffer, MutableBuffer};
use datafusion::arrow::ipc::convert::try_schema_from_ipc_buffer;
@@ -45,7 +45,7 @@ use datafusion::error::DataFusionError;
use datafusion::error::Result;
use crate::serde::protobuf;
-use crate::utils::{create_grpc_client_connection, GrpcClientConfig};
+use crate::utils::{GrpcClientConfig, create_grpc_client_connection};
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::{Stream, StreamExt};
use log::{debug, warn};
diff --git a/ballista/core/src/consistent_hash/mod.rs
b/ballista/core/src/consistent_hash/mod.rs
index 51fa8add..73767e56 100644
--- a/ballista/core/src/consistent_hash/mod.rs
+++ b/ballista/core/src/consistent_hash/mod.rs
@@ -153,10 +153,10 @@ where
.range(hashed_key..)
.chain(self.virtual_nodes.iter())
{
- if let Some((node, _)) = self.node_replicas.get(node_name) {
- if node.is_valid() {
- return Some(position_key.clone());
- }
+ if let Some((node, _)) = self.node_replicas.get(node_name)
+ && node.is_valid()
+ {
+ return Some(position_key.clone());
}
if tolerance == 0 {
return None;
@@ -177,8 +177,8 @@ pub fn md5_hash(data: &[u8]) -> Vec<u8> {
#[cfg(test)]
mod test {
- use crate::consistent_hash::node::Node;
use crate::consistent_hash::ConsistentHash;
+ use crate::consistent_hash::node::Node;
#[test]
fn test_topology() {
@@ -219,9 +219,11 @@ mod test {
for (i, key) in keys.iter().enumerate() {
if i == 2 {
assert!(consistent_hash.get(key.as_bytes()).is_none());
- assert!(consistent_hash
- .get_with_tolerance(key.as_bytes(), 1)
- .is_some());
+ assert!(
+ consistent_hash
+ .get_with_tolerance(key.as_bytes(), 1)
+ .is_some()
+ );
} else {
assert_eq!(
consistent_hash.get(key.as_bytes()).unwrap().name(),
diff --git a/ballista/core/src/diagram.rs b/ballista/core/src/diagram.rs
index b316d7c9..1af9e08f 100644
--- a/ballista/core/src/diagram.rs
+++ b/ballista/core/src/diagram.rs
@@ -19,6 +19,7 @@ use crate::error::Result;
use crate::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
use datafusion::datasource::source::DataSourceExec;
+use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -26,12 +27,11 @@ use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
-use datafusion::physical_plan::ExecutionPlan;
use log::warn;
use std::fs::File;
use std::io::{BufWriter, Write};
-use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
+use std::sync::atomic::{AtomicUsize, Ordering};
pub fn produce_diagram(filename: &str, stages: &[Arc<ShuffleWriterExec>]) ->
Result<()> {
let write_file = File::create(filename)?;
diff --git a/ballista/core/src/event_loop.rs b/ballista/core/src/event_loop.rs
index c82cc231..be970123 100644
--- a/ballista/core/src/event_loop.rs
+++ b/ballista/core/src/event_loop.rs
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
use async_trait::async_trait;
use log::{error, info};
diff --git a/ballista/core/src/execution_plans/distributed_query.rs
b/ballista/core/src/execution_plans/distributed_query.rs
index 4336b1fc..8a084f96 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -19,11 +19,11 @@ use crate::client::BallistaClient;
use crate::config::BallistaConfig;
use crate::serde::protobuf::SuccessfulJob;
use crate::serde::protobuf::{
- execute_query_params::Query, execute_query_result, job_status,
- scheduler_grpc_client::SchedulerGrpcClient, ExecuteQueryParams,
GetJobStatusParams,
- GetJobStatusResult, KeyValuePair, PartitionLocation,
+ ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair,
+ PartitionLocation, execute_query_params::Query, execute_query_result,
job_status,
+ scheduler_grpc_client::SchedulerGrpcClient,
};
-use crate::utils::{create_grpc_client_connection, GrpcClientConfig};
+use crate::utils::{GrpcClientConfig, create_grpc_client_connection};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::record_batch::RecordBatch;
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 0635d310..2ebc2fd3 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -52,7 +52,7 @@ use itertools::Itertools;
use log::{debug, error, trace};
use rand::prelude::SliceRandom;
use rand::rng;
-use tokio::sync::{mpsc, Semaphore};
+use tokio::sync::{Semaphore, mpsc};
use tokio_stream::wrappers::ReceiverStream;
/// ShuffleReaderExec reads partitions that have already been materialized by
a ShuffleWriterExec
@@ -165,9 +165,9 @@ impl ExecutionPlan for ShuffleReaderExec {
if force_remote_read {
debug!(
- "All shuffle partitions will be read as remote partitions! To
disable this behavior set: `{}=false`",
- crate::config::BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ
- );
+ "All shuffle partitions will be read as remote partitions! To
disable this behavior set: `{}=false`",
+ crate::config::BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ
+ );
}
log::debug!(
@@ -223,9 +223,7 @@ impl ExecutionPlan for ShuffleReaderExec {
trace!(
"shuffle reader at stage: {} and partition {} returned
statistics: {:?}",
- self.stage_id,
- idx,
- stat_for_partition
+ self.stage_id, idx, stat_for_partition
);
stat_for_partition
} else {
@@ -236,7 +234,10 @@ impl ExecutionPlan for ShuffleReaderExec {
.flatten()
.map(|loc| loc.partition_stats),
);
- trace!("shuffle reader at stage: {} returned statistics for all
partitions: {:?}", self.stage_id, stats_for_partitions);
+ trace!(
+ "shuffle reader at stage: {} returned statistics for all
partitions: {:?}",
+ self.stage_id, stats_for_partitions
+ );
Ok(stats_for_partitions)
}
}
@@ -575,7 +576,7 @@ mod tests {
use datafusion::physical_plan::common;
use datafusion::prelude::SessionContext;
- use tempfile::{tempdir, TempDir};
+ use tempfile::{TempDir, tempdir};
#[tokio::test]
async fn test_stats_for_partitions_empty() {
@@ -740,8 +741,8 @@ mod tests {
}
#[tokio::test]
- async fn
test_stats_for_partition_statistics_specific_partition_out_of_range(
- ) -> Result<()> {
+ async fn
test_stats_for_partition_statistics_specific_partition_out_of_range()
+ -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs
b/ballista/core/src/execution_plans/shuffle_writer.rs
index c1be52e3..42cfe03d 100644
--- a/ballista/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/core/src/execution_plans/shuffle_writer.rs
@@ -20,8 +20,8 @@
//! partition is re-partitioned and streamed to disk in Arrow IPC format.
Future stages of the query
//! will use the ShuffleReaderExec to read these results.
-use datafusion::arrow::ipc::writer::IpcWriteOptions;
use datafusion::arrow::ipc::CompressionType;
+use datafusion::arrow::ipc::writer::IpcWriteOptions;
use datafusion::arrow::ipc::writer::StreamWriter;
use std::any::Any;
@@ -51,8 +51,8 @@ use datafusion::physical_plan::metrics::{
};
use datafusion::physical_plan::{
- displayable, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
- PlanProperties, SendableRecordBatchStream, Statistics,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+ SendableRecordBatchStream, Statistics, displayable,
};
use futures::{StreamExt, TryFutureExt, TryStreamExt};
diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs
index 49bff2de..f2690174 100644
--- a/ballista/core/src/extension.rs
+++ b/ballista/core/src/extension.rs
@@ -16,9 +16,10 @@
// under the License.
use crate::config::{
- BallistaConfig, BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, BALLISTA_JOB_NAME,
+ BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, BALLISTA_JOB_NAME,
BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ,
BALLISTA_SHUFFLE_READER_MAX_REQUESTS,
BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT,
BALLISTA_STANDALONE_PARALLELISM,
+ BallistaConfig,
};
use crate::planner::BallistaQueryPlanner;
use crate::serde::protobuf::KeyValuePair;
@@ -563,8 +564,10 @@ mod test {
let pairs = config.to_key_value_pairs();
assert!(pairs.iter().any(|p| p.key == BALLISTA_JOB_NAME));
- assert!(pairs
- .iter()
- .any(|p| p.key == "datafusion.catalog.information_schema"))
+ assert!(
+ pairs
+ .iter()
+ .any(|p| p.key == "datafusion.catalog.information_schema")
+ )
}
}
diff --git a/ballista/core/src/object_store.rs
b/ballista/core/src/object_store.rs
index b698ebdc..3f752f9a 100644
--- a/ballista/core/src/object_store.rs
+++ b/ballista/core/src/object_store.rs
@@ -196,10 +196,13 @@ impl CustomObjectStoreRegistry {
}
if let Some(endpoint) = endpoint {
- if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) {
- if !matches!(allow_http, Some(true)) && endpoint_url.scheme()
== "http" {
- return config_err!("Invalid endpoint: {endpoint}. HTTP is
not allowed for S3 endpoints. To allow HTTP, set 's3.allow_http' to true");
- }
+ if let Ok(endpoint_url) = Url::try_from(endpoint.as_str())
+ && !matches!(allow_http, Some(true))
+ && endpoint_url.scheme() == "http"
+ {
+ return config_err!(
+ "Invalid endpoint: {endpoint}. HTTP is not allowed for S3
endpoints. To allow HTTP, set 's3.allow_http' to true"
+ );
}
builder = builder.with_endpoint(endpoint);
diff --git a/ballista/core/src/planner.rs b/ballista/core/src/planner.rs
index 266da3c6..bf9f5b7b 100644
--- a/ballista/core/src/planner.rs
+++ b/ballista/core/src/planner.rs
@@ -25,8 +25,8 @@ use datafusion::common::tree_node::{TreeNode,
TreeNodeVisitor};
use datafusion::error::DataFusionError;
use datafusion::execution::context::{QueryPlanner, SessionState};
use datafusion::logical_expr::{LogicalPlan, TableScan};
-use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::ExecutionPlan;
+use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use datafusion_proto::logical_plan::{AsLogicalPlan, LogicalExtensionCodec};
use std::marker::PhantomData;
@@ -180,7 +180,7 @@ mod test {
use datafusion::{
common::tree_node::TreeNode,
error::Result,
- execution::{runtime_env::RuntimeEnvBuilder, SessionStateBuilder},
+ execution::{SessionStateBuilder, runtime_env::RuntimeEnvBuilder},
prelude::{SessionConfig, SessionContext},
};
diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs
index 08901682..c71c0b70 100644
--- a/ballista/core/src/serde/mod.rs
+++ b/ballista/core/src/serde/mod.rs
@@ -29,10 +29,10 @@ use datafusion_proto::logical_plan::file_formats::{
ArrowLogicalExtensionCodec, AvroLogicalExtensionCodec,
CsvLogicalExtensionCodec,
JsonLogicalExtensionCodec, ParquetLogicalExtensionCodec,
};
+use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
use
datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use datafusion_proto::physical_plan::from_proto::parse_protobuf_partitioning;
use datafusion_proto::physical_plan::to_proto::serialize_partitioning;
-use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
use datafusion_proto::protobuf::proto_error;
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
use datafusion_proto::{
@@ -491,12 +491,12 @@ struct FileFormatProto {
mod test {
use super::*;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
- use datafusion::physical_plan::expressions::col;
use datafusion::physical_plan::Partitioning;
+ use datafusion::physical_plan::expressions::col;
use datafusion::{
common::DFSchema,
- datasource::file_format::{parquet::ParquetFormatFactory,
DefaultFileType},
- logical_expr::{dml::CopyTo, EmptyRelation, LogicalPlan},
+ datasource::file_format::{DefaultFileType,
parquet::ParquetFormatFactory},
+ logical_expr::{EmptyRelation, LogicalPlan, dml::CopyTo},
prelude::SessionContext,
};
use datafusion_proto::{logical_plan::AsLogicalPlan,
protobuf::LogicalPlanNode};
diff --git a/ballista/core/src/serde/scheduler/from_proto.rs
b/ballista/core/src/serde/scheduler/from_proto.rs
index dc8c53c7..76c0e7d9 100644
--- a/ballista/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/core/src/serde/scheduler/from_proto.rs
@@ -41,9 +41,9 @@ use crate::serde::scheduler::{
TaskDefinition,
};
-use crate::serde::{protobuf, BallistaCodec};
use crate::RuntimeProducer;
-use protobuf::{operator_metric, NamedCount, NamedGauge, NamedTime};
+use crate::serde::{BallistaCodec, protobuf};
+use protobuf::{NamedCount, NamedGauge, NamedTime, operator_metric};
impl TryInto<Action> for protobuf::Action {
type Error = BallistaError;
@@ -90,11 +90,7 @@ impl Into<PartitionStats> for protobuf::PartitionStats {
}
fn foo(n: i64) -> Option<u64> {
- if n < 0 {
- None
- } else {
- Some(n as u64)
- }
+ if n < 0 { None } else { Some(n as u64) }
}
impl TryInto<PartitionLocation> for protobuf::PartitionLocation {
@@ -291,21 +287,17 @@ impl Into<ExecutorData> for protobuf::ExecutorData {
available_task_slots: 0,
};
for resource in self.resources {
- if let Some(task_slots) = resource.total {
- if let Some(protobuf::executor_resource::Resource::TaskSlots(
- task_slots,
- )) = task_slots.resource
- {
- ret.total_task_slots = task_slots
- }
+ if let Some(task_slots) = resource.total
+ && let
Some(protobuf::executor_resource::Resource::TaskSlots(task_slots)) =
+ task_slots.resource
+ {
+ ret.total_task_slots = task_slots
};
- if let Some(task_slots) = resource.available {
- if let Some(protobuf::executor_resource::Resource::TaskSlots(
- task_slots,
- )) = task_slots.resource
- {
- ret.available_task_slots = task_slots
- }
+ if let Some(task_slots) = resource.available
+ && let
Some(protobuf::executor_resource::Resource::TaskSlots(task_slots)) =
+ task_slots.resource
+ {
+ ret.available_task_slots = task_slots
};
}
ret
diff --git a/ballista/core/src/serde/scheduler/to_proto.rs
b/ballista/core/src/serde/scheduler/to_proto.rs
index 5abb0992..01c9d20e 100644
--- a/ballista/core/src/serde/scheduler/to_proto.rs
+++ b/ballista/core/src/serde/scheduler/to_proto.rs
@@ -29,7 +29,7 @@ use crate::serde::scheduler::{
PartitionLocation, PartitionStats,
};
use datafusion::physical_plan::Partitioning;
-use protobuf::{action::ActionType, operator_metric, NamedCount, NamedGauge,
NamedTime};
+use protobuf::{NamedCount, NamedGauge, NamedTime, action::ActionType,
operator_metric};
impl TryInto<protobuf::Action> for Action {
type Error = BallistaError;
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 248dbbe0..f77e1433 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -20,15 +20,15 @@ use crate::error::{BallistaError, Result};
use crate::extension::SessionConfigExt;
use crate::serde::scheduler::PartitionStats;
+use datafusion::arrow::ipc::CompressionType;
use datafusion::arrow::ipc::writer::IpcWriteOptions;
use datafusion::arrow::ipc::writer::StreamWriter;
-use datafusion::arrow::ipc::CompressionType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::execution::context::{SessionConfig, SessionState};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::physical_plan::metrics::MetricsSet;
-use datafusion::physical_plan::{metrics, ExecutionPlan, RecordBatchStream};
+use datafusion::physical_plan::{ExecutionPlan, RecordBatchStream, metrics};
use futures::StreamExt;
use log::error;
use std::sync::Arc;
diff --git a/ballista/executor/src/bin/main.rs
b/ballista/executor/src/bin/main.rs
index 96d3e1fd..cbeb93b8 100644
--- a/ballista/executor/src/bin/main.rs
+++ b/ballista/executor/src/bin/main.rs
@@ -23,7 +23,7 @@ use ballista_core::object_store::{
};
use ballista_executor::config::Config;
use ballista_executor::executor_process::{
- start_executor_process, ExecutorProcessConfig,
+ ExecutorProcessConfig, start_executor_process,
};
use clap::Parser;
use std::env;
diff --git a/ballista/executor/src/collect.rs b/ballista/executor/src/collect.rs
index 84beff6e..2e7649de 100644
--- a/ballista/executor/src/collect.rs
+++ b/ballista/executor/src/collect.rs
@@ -30,8 +30,8 @@ use datafusion::physical_plan::{
SendableRecordBatchStream, Statistics,
};
use datafusion::{error::Result, physical_plan::RecordBatchStream};
-use futures::stream::SelectAll;
use futures::Stream;
+use futures::stream::SelectAll;
/// The CollectExec operator retrieves results from the cluster and returns
them as a single
/// vector of [RecordBatch].
diff --git a/ballista/executor/src/execution_engine.rs
b/ballista/executor/src/execution_engine.rs
index c7f9c8a5..eabb515a 100644
--- a/ballista/executor/src/execution_engine.rs
+++ b/ballista/executor/src/execution_engine.rs
@@ -21,8 +21,8 @@ use ballista_core::serde::protobuf::ShuffleWritePartition;
use ballista_core::utils;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
-use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::ExecutionPlan;
+use datafusion::physical_plan::metrics::MetricsSet;
use std::fmt::{Debug, Display};
use std::sync::Arc;
diff --git a/ballista/executor/src/execution_loop.rs
b/ballista/executor/src/execution_loop.rs
index 36b58a13..d97b9926 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -18,15 +18,15 @@
use crate::cpu_bound_executor::DedicatedExecutor;
use crate::executor::Executor;
use crate::executor_process::remove_job_dir;
-use crate::{as_task_status, TaskExecutionTimes};
+use crate::{TaskExecutionTimes, as_task_status};
use ballista_core::error::BallistaError;
use ballista_core::extension::SessionConfigHelperExt;
+use ballista_core::serde::BallistaCodec;
use ballista_core::serde::protobuf::{
- scheduler_grpc_client::SchedulerGrpcClient, PollWorkParams, PollWorkResult,
- TaskDefinition, TaskStatus,
+ PollWorkParams, PollWorkResult, TaskDefinition, TaskStatus,
+ scheduler_grpc_client::SchedulerGrpcClient,
};
use ballista_core::serde::scheduler::{ExecutorSpecification, PartitionId};
-use ballista_core::serde::BallistaCodec;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
@@ -176,7 +176,9 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U:
'static + AsExecutionPlan>
}
}
Err(error) => {
- warn!("Executor poll work loop failed. If this continues to
happen the Scheduler might be marked as dead. Error: {error}");
+ warn!(
+ "Executor poll work loop failed. If this continues to
happen the Scheduler might be marked as dead. Error: {error}"
+ );
}
}
diff --git a/ballista/executor/src/executor.rs
b/ballista/executor/src/executor.rs
index 3723c42e..55b9bc8c 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -22,13 +22,13 @@ use crate::execution_engine::ExecutionEngine;
use crate::execution_engine::QueryStageExecutor;
use crate::metrics::ExecutorMetricsCollector;
use crate::metrics::LoggingMetricsCollector;
+use ballista_core::ConfigProducer;
+use ballista_core::RuntimeProducer;
use ballista_core::error::BallistaError;
use ballista_core::registry::BallistaFunctionRegistry;
use ballista_core::serde::protobuf;
use ballista_core::serde::protobuf::ExecutorRegistration;
use ballista_core::serde::scheduler::PartitionId;
-use ballista_core::ConfigProducer;
-use ballista_core::RuntimeProducer;
use dashmap::DashMap;
use datafusion::execution::context::TaskContext;
use datafusion::execution::runtime_env::RuntimeEnv;
@@ -215,11 +215,11 @@ impl Executor {
mod test {
use crate::execution_engine::DefaultQueryStageExec;
use crate::executor::Executor;
+ use ballista_core::RuntimeProducer;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf::ExecutorRegistration;
use ballista_core::serde::scheduler::PartitionId;
use ballista_core::utils::default_config_producer;
- use ballista_core::RuntimeProducer;
use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
diff --git a/ballista/executor/src/executor_process.rs
b/ballista/executor/src/executor_process.rs
index 4756d89b..4daa53f3 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -19,16 +19,16 @@
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
-use std::sync::atomic::Ordering;
use std::sync::Arc;
+use std::sync::atomic::Ordering;
use std::time::{Duration, Instant, UNIX_EPOCH};
use arrow_flight::flight_service_server::FlightServiceServer;
use ballista_core::registry::BallistaFunctionRegistry;
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
-use futures::stream::FuturesUnordered;
use futures::StreamExt;
+use futures::stream::FuturesUnordered;
use log::{error, info, warn};
use tempfile::TempDir;
use tokio::fs::DirEntry;
@@ -46,17 +46,17 @@ use ballista_core::extension::SessionConfigExt;
use ballista_core::serde::protobuf::executor_resource::Resource;
use ballista_core::serde::protobuf::executor_status::Status;
use ballista_core::serde::protobuf::{
- scheduler_grpc_client::SchedulerGrpcClient, ExecutorRegistration,
ExecutorResource,
- ExecutorSpecification, ExecutorStatus, ExecutorStoppedParams,
HeartBeatParams,
+ ExecutorRegistration, ExecutorResource, ExecutorSpecification,
ExecutorStatus,
+ ExecutorStoppedParams, HeartBeatParams,
scheduler_grpc_client::SchedulerGrpcClient,
};
use ballista_core::serde::{
BallistaCodec, BallistaLogicalExtensionCodec,
BallistaPhysicalExtensionCodec,
};
use ballista_core::utils::{
- create_grpc_client_connection, create_grpc_server, default_config_producer,
- get_time_before, GrpcServerConfig,
+ GrpcServerConfig, create_grpc_client_connection, create_grpc_server,
+ default_config_producer, get_time_before,
};
-use ballista_core::{ConfigProducer, RuntimeProducer, BALLISTA_VERSION};
+use ballista_core::{BALLISTA_VERSION, ConfigProducer, RuntimeProducer};
use crate::execution_engine::ExecutionEngine;
use crate::executor::{Executor, TasksDrainedFuture};
@@ -65,8 +65,8 @@ use crate::flight_service::BallistaFlightService;
use crate::metrics::LoggingMetricsCollector;
use crate::shutdown::Shutdown;
use crate::shutdown::ShutdownNotifier;
+use crate::{ArrowFlightServerProvider, terminate};
use crate::{execution_loop, executor_server};
-use crate::{terminate, ArrowFlightServerProvider};
pub struct ExecutorProcessConfig {
pub bind_host: String,
@@ -493,7 +493,9 @@ async fn flight_server_task(
grpc_server_config: GrpcServerConfig,
) -> JoinHandle<Result<(), BallistaError>> {
tokio::spawn(async move {
- info!("Built-in arrow flight server listening on: {address:?}
max_encoding_size: {max_encoding_message_size} max_decoding_size:
{max_decoding_message_size}");
+ info!(
+ "Built-in arrow flight server listening on: {address:?}
max_encoding_size: {max_encoding_message_size} max_decoding_size:
{max_decoding_message_size}"
+ );
let server_future = create_grpc_server(&grpc_server_config)
.add_service(
@@ -556,7 +558,9 @@ async fn clean_shuffle_data_loop(
Ok(_) => {}
}
} else {
- warn!("{child_path:?} under the working directory is a not a
directory and will be ignored when doing cleanup")
+ warn!(
+ "{child_path:?} under the working directory is a not a
directory and will be ignored when doing cleanup"
+ )
}
} else {
error!("Fail to get metadata for file {:?}", child.path())
diff --git a/ballista/executor/src/executor_server.rs
b/ballista/executor/src/executor_server.rs
index 1b40650e..ed2b2504 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -18,8 +18,8 @@
use ballista_core::BALLISTA_VERSION;
use std::collections::HashMap;
use std::convert::TryInto;
-use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
@@ -29,21 +29,21 @@ use tonic::{Request, Response, Status};
use ballista_core::error::BallistaError;
use ballista_core::extension::SessionConfigExt;
+use ballista_core::serde::BallistaCodec;
use ballista_core::serde::protobuf::{
- executor_grpc_server::{ExecutorGrpc, ExecutorGrpcServer},
- executor_metric, executor_status,
- scheduler_grpc_client::SchedulerGrpcClient,
CancelTasksParams, CancelTasksResult, ExecutorMetric, ExecutorStatus,
HeartBeatParams, LaunchMultiTaskParams, LaunchMultiTaskResult,
LaunchTaskParams,
LaunchTaskResult, RegisterExecutorParams, RemoveJobDataParams,
RemoveJobDataResult,
StopExecutorParams, StopExecutorResult, TaskStatus, UpdateTaskStatusParams,
+ executor_grpc_server::{ExecutorGrpc, ExecutorGrpcServer},
+ executor_metric, executor_status,
+ scheduler_grpc_client::SchedulerGrpcClient,
};
+use ballista_core::serde::scheduler::PartitionId;
+use ballista_core::serde::scheduler::TaskDefinition;
use ballista_core::serde::scheduler::from_proto::{
get_task_definition, get_task_definition_vec,
};
-use ballista_core::serde::scheduler::PartitionId;
-use ballista_core::serde::scheduler::TaskDefinition;
-use ballista_core::serde::BallistaCodec;
use ballista_core::utils::{create_grpc_client_connection, create_grpc_server};
use dashmap::DashMap;
use datafusion::execution::TaskContext;
@@ -53,9 +53,9 @@ use tokio::task::JoinHandle;
use crate::cpu_bound_executor::DedicatedExecutor;
use crate::executor::Executor;
-use crate::executor_process::{remove_job_dir, ExecutorProcessConfig};
+use crate::executor_process::{ExecutorProcessConfig, remove_job_dir};
use crate::shutdown::ShutdownNotifier;
-use crate::{as_task_status, TaskExecutionTimes};
+use crate::{TaskExecutionTimes, as_task_status};
type ServerHandle = JoinHandle<Result<(), BallistaError>>;
type SchedulerClients = Arc<DashMap<String, SchedulerGrpcClient<Channel>>>;
@@ -522,7 +522,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskRunnerPool<T,
break;
}
Err(TryRecvError::Disconnected) => {
- info!("Channel is closed and will exit the task
status report loop");
+ info!(
+ "Channel is closed and will exit the task
status report loop"
+ );
drop(tasks_status_complete);
return;
}
diff --git a/ballista/executor/src/flight_service.rs
b/ballista/executor/src/flight_service.rs
index 0de791f6..9ff762f0 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -31,9 +31,9 @@ use ballista_core::serde::scheduler::Action as BallistaAction;
use datafusion::arrow::ipc::CompressionType;
use arrow_flight::{
- flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
- FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
HandshakeResponse,
- PollInfo, PutResult, SchemaResult, Ticket,
+ Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor,
FlightInfo,
+ HandshakeRequest, HandshakeResponse, PollInfo, PutResult, SchemaResult,
Ticket,
+ flight_service_server::FlightService,
};
use datafusion::arrow::ipc::writer::IpcWriteOptions;
use datafusion::arrow::{error::ArrowError, record_batch::RecordBatch};
diff --git a/ballista/executor/src/lib.rs b/ballista/executor/src/lib.rs
index e9322592..15f42a70 100644
--- a/ballista/executor/src/lib.rs
+++ b/ballista/executor/src/lib.rs
@@ -44,8 +44,8 @@ use log::info;
use crate::shutdown::Shutdown;
use ballista_core::serde::protobuf::{
- task_status, FailedTask, OperatorMetricsSet, ShuffleWritePartition,
SuccessfulTask,
- TaskStatus,
+ FailedTask, OperatorMetricsSet, ShuffleWritePartition, SuccessfulTask,
TaskStatus,
+ task_status,
};
use ballista_core::serde::scheduler::PartitionId;
use ballista_core::utils::GrpcServerConfig;
diff --git a/ballista/executor/src/standalone.rs
b/ballista/executor/src/standalone.rs
index d9fa3c51..aa3b12b5 100644
--- a/ballista/executor/src/standalone.rs
+++ b/ballista/executor/src/standalone.rs
@@ -20,14 +20,14 @@ use crate::{execution_loop, executor::Executor,
flight_service::BallistaFlightSe
use arrow_flight::flight_service_server::FlightServiceServer;
use ballista_core::extension::SessionConfigExt;
use ballista_core::registry::BallistaFunctionRegistry;
-use ballista_core::utils::{default_config_producer, GrpcServerConfig};
+use ballista_core::utils::{GrpcServerConfig, default_config_producer};
use ballista_core::{
+ BALLISTA_VERSION,
error::Result,
- serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient,
ExecutorRegistration},
- serde::scheduler::ExecutorSpecification,
serde::BallistaCodec,
+ serde::protobuf::{ExecutorRegistration,
scheduler_grpc_client::SchedulerGrpcClient},
+ serde::scheduler::ExecutorSpecification,
utils::create_grpc_server,
- BALLISTA_VERSION,
};
use ballista_core::{ConfigProducer, RuntimeProducer};
use datafusion::execution::{SessionState, SessionStateBuilder};
diff --git a/ballista/scheduler/src/api/handlers.rs
b/ballista/scheduler/src/api/handlers.rs
index 8b5ed367..3230a8be 100644
--- a/ballista/scheduler/src/api/handlers.rs
+++ b/ballista/scheduler/src/api/handlers.rs
@@ -10,17 +10,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::SchedulerServer;
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::state::execution_graph::ExecutionStage;
use crate::state::execution_graph_dot::ExecutionGraphDot;
use axum::{
+ Json,
extract::{Path, State},
response::{IntoResponse, Response},
- Json,
};
-use ballista_core::serde::protobuf::job_status::Status;
use ballista_core::BALLISTA_VERSION;
+use ballista_core::serde::protobuf::job_status::Status;
use datafusion::physical_plan::metrics::{MetricValue, MetricsSet, Time};
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
@@ -30,7 +30,7 @@ use graphviz_rust::{
exec,
printer::PrinterContext,
};
-use http::{header::CONTENT_TYPE, StatusCode};
+use http::{StatusCode, header::CONTENT_TYPE};
use std::sync::Arc;
use std::time::Duration;
diff --git a/ballista/scheduler/src/api/mod.rs
b/ballista/scheduler/src/api/mod.rs
index 05cf2718..5dd1aee2 100644
--- a/ballista/scheduler/src/api/mod.rs
+++ b/ballista/scheduler/src/api/mod.rs
@@ -14,7 +14,7 @@ mod handlers;
use crate::scheduler_server::SchedulerServer;
use axum::routing::patch;
-use axum::{routing::get, Router};
+use axum::{Router, routing::get};
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
use std::sync::Arc;
diff --git a/ballista/scheduler/src/cluster/event/mod.rs
b/ballista/scheduler/src/cluster/event/mod.rs
index 4c294491..659cc424 100644
--- a/ballista/scheduler/src/cluster/event/mod.rs
+++ b/ballista/scheduler/src/cluster/event/mod.rs
@@ -20,8 +20,8 @@ use log::debug;
use parking_lot::RwLock;
use std::collections::BTreeMap;
use std::pin::Pin;
-use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
+use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll, Waker};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::TryRecvError;
@@ -157,8 +157,8 @@ impl<T: Clone> Stream for EventSubscriber<T> {
#[cfg(test)]
mod test {
use crate::cluster::event::{ClusterEventSender, EventSubscriber};
- use futures::stream::FuturesUnordered;
use futures::StreamExt;
+ use futures::stream::FuturesUnordered;
async fn collect_events<T: Clone>(mut rx: EventSubscriber<T>) -> Vec<T> {
let mut events = vec![];
diff --git a/ballista/scheduler/src/cluster/memory.rs
b/ballista/scheduler/src/cluster/memory.rs
index 4ea39dbb..c22ee31c 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -16,24 +16,25 @@
// under the License.
use crate::cluster::{
- bind_task_bias, bind_task_consistent_hash, bind_task_round_robin,
get_scan_files,
- is_skip_consistent_hash, BoundTask, ClusterState, ExecutorSlot, JobState,
- JobStateEvent, JobStateEventStream, JobStatus, TaskDistributionPolicy,
TopologyNode,
+ BoundTask, ClusterState, ExecutorSlot, JobState, JobStateEvent,
JobStateEventStream,
+ JobStatus, TaskDistributionPolicy, TopologyNode, bind_task_bias,
+ bind_task_consistent_hash, bind_task_round_robin, get_scan_files,
+ is_skip_consistent_hash,
};
use crate::state::execution_graph::ExecutionGraph;
use async_trait::async_trait;
+use ballista_core::ConfigProducer;
use ballista_core::error::{BallistaError, Result};
use ballista_core::serde::protobuf::{
- executor_status, AvailableTaskSlots, ExecutorHeartbeat, ExecutorStatus,
FailedJob,
- QueuedJob,
+ AvailableTaskSlots, ExecutorHeartbeat, ExecutorStatus, FailedJob,
QueuedJob,
+ executor_status,
};
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
-use ballista_core::ConfigProducer;
use dashmap::DashMap;
use datafusion::prelude::{SessionConfig, SessionContext};
use crate::cluster::event::ClusterEventSender;
-use crate::scheduler_server::{timestamp_millis, timestamp_secs,
SessionBuilder};
+use crate::scheduler_server::{SessionBuilder, timestamp_millis,
timestamp_secs};
use crate::state::session_manager::create_datafusion_context;
use crate::state::task_manager::JobInfoCache;
use ballista_core::serde::protobuf::job_status::Status;
@@ -69,10 +70,10 @@ impl InMemoryClusterState {
) -> HashMap<String, TopologyNode> {
let mut nodes: HashMap<String, TopologyNode> = HashMap::new();
for (executor_id, slots) in guard.iter() {
- if let Some(executors) = executors.as_ref() {
- if !executors.contains(executor_id) {
- continue;
- }
+ if let Some(executors) = executors.as_ref()
+ && !executors.contains(executor_id)
+ {
+ continue;
}
if let Some(executor) = self.executors.get(&slots.executor_id) {
let node = TopologyNode::new(
diff --git a/ballista/scheduler/src/cluster/mod.rs
b/ballista/scheduler/src/cluster/mod.rs
index 4942d452..d9ea9e54 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -33,17 +33,17 @@ use log::debug;
use ballista_core::consistent_hash::ConsistentHash;
use ballista_core::error::Result;
use ballista_core::serde::protobuf::{
- job_status, AvailableTaskSlots, ExecutorHeartbeat, JobStatus,
+ AvailableTaskSlots, ExecutorHeartbeat, JobStatus, job_status,
};
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata,
PartitionId};
use ballista_core::utils::{default_config_producer, default_session_builder};
-use ballista_core::{consistent_hash, ConfigProducer};
+use ballista_core::{ConfigProducer, consistent_hash};
use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState};
use crate::config::{SchedulerConfig, TaskDistributionPolicy};
use crate::scheduler_server::SessionBuilder;
-use crate::state::execution_graph::{create_task_info, ExecutionGraph,
TaskDescription};
+use crate::state::execution_graph::{ExecutionGraph, TaskDescription,
create_task_info};
use crate::state::task_manager::JobInfoCache;
pub mod event;
@@ -532,7 +532,9 @@ pub(crate) async fn bind_task_consistent_hash(
total_slots += node.available_slots as usize;
}
if total_slots == 0 {
- debug!("Not enough available executor slots for binding tasks with
consistent hashing policy!!!");
+ debug!(
+ "Not enough available executor slots for binding tasks with
consistent hashing policy!!!"
+ );
return Ok((vec![], None));
}
debug!("Total slot number for consistent hash binding is {total_slots}");
@@ -704,16 +706,16 @@ mod test {
use std::sync::Arc;
use datafusion::datasource::listing::PartitionedFile;
- use object_store::path::Path;
use object_store::ObjectMeta;
+ use object_store::path::Path;
use ballista_core::error::Result;
use ballista_core::serde::protobuf::AvailableTaskSlots;
use ballista_core::serde::scheduler::{ExecutorMetadata,
ExecutorSpecification};
use crate::cluster::{
- bind_task_bias, bind_task_consistent_hash, bind_task_round_robin,
BoundTask,
- TopologyNode,
+ BoundTask, TopologyNode, bind_task_bias, bind_task_consistent_hash,
+ bind_task_round_robin,
};
use crate::state::execution_graph::ExecutionGraph;
use crate::state::task_manager::JobInfoCache;
diff --git a/ballista/scheduler/src/cluster/test_util/mod.rs
b/ballista/scheduler/src/cluster/test_util/mod.rs
index 64da6c60..70269563 100644
--- a/ballista/scheduler/src/cluster/test_util/mod.rs
+++ b/ballista/scheduler/src/cluster/test_util/mod.rs
@@ -20,8 +20,8 @@ use crate::scheduler_server::timestamp_millis;
use crate::state::execution_graph::ExecutionGraph;
use crate::test_utils::{await_condition, mock_completed_task, mock_executor};
use ballista_core::error::Result;
-use ballista_core::serde::protobuf::job_status::Status;
use ballista_core::serde::protobuf::JobStatus;
+use ballista_core::serde::protobuf::job_status::Status;
use futures::StreamExt;
use std::sync::Arc;
use std::time::Duration;
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 559f1d67..9b055340 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -25,9 +25,9 @@
//
#![allow(clippy::uninlined_format_args)]
-use crate::cluster::DistributionPolicy;
use crate::SessionBuilder;
-use ballista_core::{config::TaskSchedulingPolicy, ConfigProducer};
+use crate::cluster::DistributionPolicy;
+use ballista_core::{ConfigProducer, config::TaskSchedulingPolicy};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use std::fmt::Display;
diff --git a/ballista/scheduler/src/display.rs
b/ballista/scheduler/src/display.rs
index 5cdd8802..75f85a6f 100644
--- a/ballista/scheduler/src/display.rs
+++ b/ballista/scheduler/src/display.rs
@@ -23,7 +23,7 @@ use ballista_core::utils::collect_plan_metrics;
use datafusion::logical_expr::{StringifiedPlan, ToStringifiedPlan};
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::{
- accept, DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor,
+ DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor, accept,
};
use log::{error, info};
use std::fmt;
diff --git a/ballista/scheduler/src/metrics/prometheus.rs
b/ballista/scheduler/src/metrics/prometheus.rs
index 9c26145b..032f0f82 100644
--- a/ballista/scheduler/src/metrics/prometheus.rs
+++ b/ballista/scheduler/src/metrics/prometheus.rs
@@ -20,8 +20,8 @@ use ballista_core::error::{BallistaError, Result};
use once_cell::sync::OnceCell;
use prometheus::{
- register_counter_with_registry, register_gauge_with_registry,
- register_histogram_with_registry, Counter, Gauge, Histogram, Registry,
+ Counter, Gauge, Histogram, Registry, register_counter_with_registry,
+ register_gauge_with_registry, register_histogram_with_registry,
};
use prometheus::{Encoder, TextEncoder};
use std::sync::Arc;
diff --git a/ballista/scheduler/src/physical_optimizer/join_selection.rs
b/ballista/scheduler/src/physical_optimizer/join_selection.rs
index 3fb4d60b..7873ac3e 100644
--- a/ballista/scheduler/src/physical_optimizer/join_selection.rs
+++ b/ballista/scheduler/src/physical_optimizer/join_selection.rs
@@ -36,10 +36,10 @@ use datafusion::common::config::ConfigOptions;
use datafusion::common::error::Result;
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion::common::{internal_err, JoinSide, JoinType};
+use datafusion::common::{JoinSide, JoinType, internal_err};
use datafusion::logical_expr::sort_properties::SortProperties;
-use datafusion::physical_expr::expressions::Column;
use datafusion::physical_expr::LexOrdering;
+use datafusion::physical_expr::expressions::Column;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::execution_plan::EmissionType;
use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
@@ -544,19 +544,15 @@ pub fn hash_join_swap_subrule(
mut input: Arc<dyn ExecutionPlan>,
_config_options: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
- if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
- if hash_join.left.boundedness().is_unbounded()
- && !hash_join.right.boundedness().is_unbounded()
- && matches!(
- *hash_join.join_type(),
- JoinType::Inner
- | JoinType::Left
- | JoinType::LeftSemi
- | JoinType::LeftAnti
- )
- {
- input = swap_join_according_to_unboundedness(hash_join)?;
- }
+ if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>()
+ && hash_join.left.boundedness().is_unbounded()
+ && !hash_join.right.boundedness().is_unbounded()
+ && matches!(
+ *hash_join.join_type(),
+ JoinType::Inner | JoinType::Left | JoinType::LeftSemi |
JoinType::LeftAnti
+ )
+ {
+ input = swap_join_according_to_unboundedness(hash_join)?;
}
Ok(input)
}
@@ -612,14 +608,14 @@ mod test {
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
- common::{stats::Precision, ColumnStatistics, JoinSide, JoinType,
Statistics},
+ common::{ColumnStatistics, JoinSide, JoinType, Statistics,
stats::Precision},
config::ConfigOptions,
logical_expr::Operator,
physical_plan::{
+ ExecutionPlan,
expressions::{BinaryExpr, Column},
joins::utils::{ColumnIndex, JoinFilter},
test::exec::StatisticsExec,
- ExecutionPlan,
},
};
//
diff --git a/ballista/scheduler/src/planner.rs
b/ballista/scheduler/src/planner.rs
index 80fa8440..a185f89b 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -26,13 +26,13 @@ use ballista_core::{
serde::scheduler::PartitionLocation,
};
use datafusion::config::ConfigOptions;
-use datafusion::physical_optimizer::enforce_sorting::EnforceSorting;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
+use datafusion::physical_optimizer::enforce_sorting::EnforceSorting;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use
datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::{
- with_new_children_if_necessary, ExecutionPlan, Partitioning,
+ ExecutionPlan, Partitioning, with_new_children_if_necessary,
};
use log::{debug, info};
@@ -338,7 +338,7 @@ mod test {
use datafusion::physical_plan::sorts::sort::SortExec;
use
datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::windows::BoundedWindowAggExec;
- use datafusion::physical_plan::{displayable, ExecutionPlan};
+ use datafusion::physical_plan::{ExecutionPlan, displayable};
use datafusion::physical_plan::{InputOrderMode, Partitioning};
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf::LogicalPlanNode;
diff --git a/ballista/scheduler/src/scheduler_process.rs
b/ballista/scheduler/src/scheduler_process.rs
index 4cb4d81c..361ade55 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -15,12 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+use ballista_core::BALLISTA_VERSION;
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer;
use ballista_core::serde::{
BallistaCodec, BallistaLogicalExtensionCodec,
BallistaPhysicalExtensionCodec,
};
-use ballista_core::BALLISTA_VERSION;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
@@ -35,9 +35,9 @@ use crate::cluster::BallistaCluster;
use crate::config::SchedulerConfig;
use crate::metrics::default_metrics_collector;
+use crate::scheduler_server::SchedulerServer;
#[cfg(feature = "keda-scaler")]
use
crate::scheduler_server::externalscaler::external_scaler_server::ExternalScalerServer;
-use crate::scheduler_server::SchedulerServer;
/// Creates as initialized scheduler service
/// without exposing it as a grpc service
diff --git a/ballista/scheduler/src/scheduler_server/external_scaler.rs
b/ballista/scheduler/src/scheduler_server/external_scaler.rs
index f2feb243..021670e4 100644
--- a/ballista/scheduler/src/scheduler_server/external_scaler.rs
+++ b/ballista/scheduler/src/scheduler_server/external_scaler.rs
@@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+use crate::scheduler_server::SchedulerServer;
use crate::scheduler_server::externalscaler::{
- external_scaler_server::ExternalScaler, GetMetricSpecResponse,
GetMetricsRequest,
- GetMetricsResponse, IsActiveResponse, MetricSpec, MetricValue,
ScaledObjectRef,
+ GetMetricSpecResponse, GetMetricsRequest, GetMetricsResponse,
IsActiveResponse,
+ MetricSpec, MetricValue, ScaledObjectRef,
external_scaler_server::ExternalScaler,
};
-use crate::scheduler_server::SchedulerServer;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 58f53cba..2860ff1b 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -21,14 +21,15 @@ use ballista_core::extension::SessionConfigHelperExt;
use ballista_core::serde::protobuf::execute_query_params::Query;
use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
use ballista_core::serde::protobuf::{
- execute_query_failure_result, execute_query_result, AvailableTaskSlots,
- CancelJobParams, CancelJobResult, CleanJobDataParams, CleanJobDataResult,
- CreateUpdateSessionParams, CreateUpdateSessionResult,
ExecuteQueryFailureResult,
- ExecuteQueryParams, ExecuteQueryResult, ExecuteQuerySuccessResult,
ExecutorHeartbeat,
- ExecutorStoppedParams, ExecutorStoppedResult, GetJobStatusParams,
GetJobStatusResult,
- HeartBeatParams, HeartBeatResult, PollWorkParams, PollWorkResult,
- RegisterExecutorParams, RegisterExecutorResult, RemoveSessionParams,
- RemoveSessionResult, UpdateTaskStatusParams, UpdateTaskStatusResult,
+ AvailableTaskSlots, CancelJobParams, CancelJobResult, CleanJobDataParams,
+ CleanJobDataResult, CreateUpdateSessionParams, CreateUpdateSessionResult,
+ ExecuteQueryFailureResult, ExecuteQueryParams, ExecuteQueryResult,
+ ExecuteQuerySuccessResult, ExecutorHeartbeat, ExecutorStoppedParams,
+ ExecutorStoppedResult, GetJobStatusParams, GetJobStatusResult,
HeartBeatParams,
+ HeartBeatResult, PollWorkParams, PollWorkResult, RegisterExecutorParams,
+ RegisterExecutorResult, RemoveSessionParams, RemoveSessionResult,
+ UpdateTaskStatusParams, UpdateTaskStatusResult,
execute_query_failure_result,
+ execute_query_result,
};
use ballista_core::serde::scheduler::ExecutorMetadata;
use datafusion_proto::logical_plan::AsLogicalPlan;
@@ -116,16 +117,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
TaskDistributionPolicy::RoundRobin => {
bind_task_round_robin(available_slots, running_jobs, |_|
false).await
}
- TaskDistributionPolicy::ConsistentHash{..} => {
+ TaskDistributionPolicy::ConsistentHash { .. } => {
return Err(Status::unimplemented(
- "ConsistentHash TaskDistribution is not feasible for
pull-based task scheduling"))
+ "ConsistentHash TaskDistribution is not feasible for
pull-based task scheduling",
+ ));
}
- TaskDistributionPolicy::Custom(ref policy) =>{
- policy.bind_tasks(available_slots,
running_jobs).await.map_err(|e| {
- Status::internal(e.to_string())
- })?
- }
+ TaskDistributionPolicy::Custom(ref policy) => policy
+ .bind_tasks(available_slots, running_jobs)
+ .await
+ .map_err(|e| Status::internal(e.to_string()))?,
};
let mut tasks = vec![];
@@ -345,7 +346,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
let job_id = self.state.task_manager.generate_job_id();
- info!("execution query - session_id: {session_id}, operation_id:
{operation_id}, job_name: {job_name}, job_id: {job_id}");
+ info!(
+ "execution query - session_id: {session_id}, operation_id:
{operation_id}, job_name: {job_name}, job_id: {job_id}"
+ );
let (session_id, session_ctx) = {
let session_config =
self.state.session_manager.produce_config();
@@ -547,12 +550,12 @@ mod test {
use crate::config::SchedulerConfig;
use crate::metrics::default_metrics_collector;
use ballista_core::error::BallistaError;
+ use ballista_core::serde::BallistaCodec;
use ballista_core::serde::protobuf::{
- executor_status, ExecutorRegistration, ExecutorStatus,
ExecutorStoppedParams,
- HeartBeatParams, PollWorkParams, RegisterExecutorParams,
+ ExecutorRegistration, ExecutorStatus, ExecutorStoppedParams,
HeartBeatParams,
+ PollWorkParams, RegisterExecutorParams, executor_status,
};
use ballista_core::serde::scheduler::ExecutorSpecification;
- use ballista_core::serde::BallistaCodec;
use crate::state::SchedulerState;
use crate::test_utils::await_condition;
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs
b/ballista/scheduler/src/scheduler_server/mod.rs
index e741601e..8f78c970 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -20,8 +20,8 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use ballista_core::error::Result;
use ballista_core::event_loop::{EventLoop, EventSender};
-use ballista_core::serde::protobuf::TaskStatus;
use ballista_core::serde::BallistaCodec;
+use ballista_core::serde::protobuf::TaskStatus;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::LogicalPlan;
@@ -40,8 +40,8 @@ use
crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
use crate::state::executor_manager::ExecutorManager;
-use crate::state::task_manager::TaskLauncher;
use crate::state::SchedulerState;
+use crate::state::task_manager::TaskLauncher;
// include the generated protobuf source as a submodule
#[cfg(feature = "keda-scaler")]
@@ -276,8 +276,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
let stop_reason = if terminating {
format!(
- "TERMINATING executor {executor_id} heartbeat timed
out after {}s", state.config.executor_termination_grace_period,
- )
+ "TERMINATING executor {executor_id} heartbeat
timed out after {}s",
+ state.config.executor_termination_grace_period,
+ )
} else {
format!(
"ACTIVE executor {executor_id} heartbeat timed out
after {}s",
@@ -388,7 +389,7 @@ mod test {
use ballista_core::extension::SessionConfigExt;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::functions_aggregate::sum::sum;
- use datafusion::logical_expr::{col, LogicalPlan};
+ use datafusion::logical_expr::{LogicalPlan, col};
use datafusion::prelude::SessionConfig;
use datafusion::test_util::scan_empty_with_partitions;
@@ -400,22 +401,22 @@ mod test {
use crate::config::SchedulerConfig;
+ use ballista_core::serde::BallistaCodec;
use ballista_core::serde::protobuf::{
- failed_task, job_status, task_status, ExecutionError, FailedTask,
JobStatus,
- MultiTaskDefinition, ShuffleWritePartition, SuccessfulJob,
SuccessfulTask,
- TaskId, TaskStatus,
+ ExecutionError, FailedTask, JobStatus, MultiTaskDefinition,
+ ShuffleWritePartition, SuccessfulJob, SuccessfulTask, TaskId,
TaskStatus,
+ failed_task, job_status, task_status,
};
use ballista_core::serde::scheduler::{
ExecutorData, ExecutorMetadata, ExecutorSpecification,
};
- use ballista_core::serde::BallistaCodec;
- use crate::scheduler_server::{timestamp_millis, SchedulerServer};
+ use crate::scheduler_server::{SchedulerServer, timestamp_millis};
use crate::test_utils::{
+ ExplodingTableProvider, SchedulerTest, TaskRunnerFn,
TestMetricsCollector,
assert_completed_event, assert_failed_event, assert_no_submitted_event,
- assert_submitted_event, test_cluster_context, ExplodingTableProvider,
- SchedulerTest, TaskRunnerFn, TestMetricsCollector,
+ assert_submitted_event, test_cluster_context,
};
#[tokio::test]
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 2d151c1e..97e7f856 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -279,15 +279,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
QueryStageSchedulerEvent::ExecutorLost(executor_id, _) => {
match
self.state.task_manager.executor_lost(&executor_id).await {
Ok(tasks) => {
- if !tasks.is_empty() {
- if let Err(e) = self
+ if !tasks.is_empty()
+ && let Err(e) = self
.state
.executor_manager
.cancel_running_tasks(tasks)
.await
- {
- warn!("Fail to cancel running tasks due to
{e:?}");
- }
+ {
+ warn!("Fail to cancel running tasks due to {e:?}");
}
}
Err(e) => {
@@ -335,12 +334,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
#[cfg(test)]
mod tests {
use crate::config::SchedulerConfig;
- use crate::test_utils::{await_condition, SchedulerTest,
TestMetricsCollector};
+ use crate::test_utils::{SchedulerTest, TestMetricsCollector,
await_condition};
use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::error::Result;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::functions_aggregate::sum::sum;
- use datafusion::logical_expr::{col, LogicalPlan};
+ use datafusion::logical_expr::{LogicalPlan, col};
use datafusion::test_util::scan_empty_with_partitions;
use std::sync::Arc;
use std::time::Duration;
diff --git a/ballista/scheduler/src/standalone.rs
b/ballista/scheduler/src/standalone.rs
index c4e6295d..cf835a75 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -19,16 +19,16 @@ use crate::cluster::BallistaCluster;
use crate::config::SchedulerConfig;
use crate::metrics::default_metrics_collector;
use crate::scheduler_server::SchedulerServer;
+use ballista_core::ConfigProducer;
use ballista_core::extension::SessionConfigExt;
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::{
- create_grpc_server, default_config_producer, default_session_builder,
- GrpcServerConfig,
+ GrpcServerConfig, create_grpc_server, default_config_producer,
+ default_session_builder,
};
-use ballista_core::ConfigProducer;
use ballista_core::{
- error::Result, serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer,
- BALLISTA_VERSION,
+ BALLISTA_VERSION, error::Result,
+ serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer,
};
use datafusion::execution::SessionState;
use datafusion::prelude::SessionConfig;
diff --git a/ballista/scheduler/src/state/distributed_explain.rs
b/ballista/scheduler/src/state/distributed_explain.rs
index 8d602a4f..340aa501 100644
--- a/ballista/scheduler/src/state/distributed_explain.rs
+++ b/ballista/scheduler/src/state/distributed_explain.rs
@@ -25,13 +25,13 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::{ScalarValue, UnnestOptions};
use datafusion::logical_expr::{LogicalPlan, PlanType, StringifiedPlan};
use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::expressions::col;
use datafusion::physical_plan::expressions::lit;
use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
-use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use crate::state::execution_graph::ExecutionStage;
diff --git a/ballista/scheduler/src/state/execution_graph.rs
b/ballista/scheduler/src/state/execution_graph.rs
index ce2eb717..846907ff 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use datafusion::physical_plan::display::DisplayableExecutionPlan;
-use datafusion::physical_plan::{accept, ExecutionPlan, ExecutionPlanVisitor};
+use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanVisitor, accept};
use datafusion::prelude::SessionConfig;
use log::{debug, error, info, warn};
@@ -31,11 +31,11 @@ use ballista_core::error::{BallistaError, Result};
use ballista_core::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
use ballista_core::serde::protobuf::failed_task::FailedReason;
use ballista_core::serde::protobuf::job_status::Status;
-use ballista_core::serde::protobuf::{job_status, FailedJob,
ShuffleWritePartition};
-use ballista_core::serde::protobuf::{task_status, RunningTask};
+use ballista_core::serde::protobuf::{FailedJob, ShuffleWritePartition,
job_status};
use ballista_core::serde::protobuf::{
FailedTask, JobStatus, ResultLost, RunningJob, SuccessfulJob, TaskStatus,
};
+use ballista_core::serde::protobuf::{RunningTask, task_status};
use ballista_core::serde::scheduler::{
ExecutorMetadata, PartitionId, PartitionLocation, PartitionStats,
};
@@ -314,8 +314,14 @@ impl ExecutionGraph {
let task_stage_attempt_num =
task_status.stage_attempt_num as usize;
if task_stage_attempt_num <
running_stage.stage_attempt_num {
- warn!("Ignore TaskStatus update with TID {} as
it's from Stage {}.{} and there is a more recent stage attempt {}.{} running",
- task_status.task_id, stage_id,
task_stage_attempt_num, stage_id, running_stage.stage_attempt_num);
+ warn!(
+ "Ignore TaskStatus update with TID {} as it's
from Stage {}.{} and there is a more recent stage attempt {}.{} running",
+ task_status.task_id,
+ stage_id,
+ task_stage_attempt_num,
+ stage_id,
+ running_stage.stage_attempt_num
+ );
continue;
}
let partition_id = task_status.clone().partition_id as
usize;
@@ -359,7 +365,8 @@ impl ExecutionGraph {
if !failed_stages.is_empty() {
let error_msg = format!(
- "Stages was marked failed,
ignore FetchPartitionError from task {task_identity}");
+ "Stages was marked failed,
ignore FetchPartitionError from task {task_identity}"
+ );
warn!("{error_msg}");
} else {
// There are different removal
strategies here.
@@ -385,7 +392,9 @@ impl ExecutionGraph {
.entry(map_stage_id)
.or_default();
missing_inputs.extend(removed_map_partitions);
- warn!("Need to resubmit the
current running Stage {stage_id} and its map Stage {map_stage_id} due to
FetchPartitionError from task {task_identity}")
+ warn!(
+ "Need to resubmit the current
running Stage {stage_id} and its map Stage {map_stage_id} due to
FetchPartitionError from task {task_identity}"
+ )
}
} else {
let error_msg = format!(
@@ -415,7 +424,10 @@ impl ExecutionGraph {
} else {
let error_msg = format!(
"Task {} in Stage {} failed {}
times, fail the stage, most recent failure reason: {:?}",
- partition_id, stage_id,
max_task_failures, failed_task.error
+ partition_id,
+ stage_id,
+ max_task_failures,
+ failed_task.error
);
error!("{error_msg}");
failed_stages.insert(stage_id,
error_msg);
@@ -428,7 +440,8 @@ impl ExecutionGraph {
}
None => {
let error_msg = format!(
- "Task {partition_id} in Stage
{stage_id} failed with unknown failure reasons, fail the stage");
+ "Task {partition_id} in Stage
{stage_id} failed with unknown failure reasons, fail the stage"
+ );
error!("{error_msg}");
failed_stages.insert(stage_id, error_msg);
}
@@ -499,68 +512,64 @@ impl ExecutionGraph {
// handle delayed failed tasks if the stage's next
attempt is still in UnResolved status.
if let Some(task_status::Status::Failed(failed_task)) =
task_status.status
- {
- if unsolved_stage.stage_attempt_num -
task_stage_attempt_num
+ && unsolved_stage.stage_attempt_num -
task_stage_attempt_num
== 1
- {
- let failed_reason = failed_task.failed_reason;
- match failed_reason {
- Some(FailedReason::ExecutionError(_)) => {
- should_ignore = false;
- failed_stages.insert(stage_id,
failed_task.error);
+ {
+ let failed_reason = failed_task.failed_reason;
+ match failed_reason {
+ Some(FailedReason::ExecutionError(_)) => {
+ should_ignore = false;
+ failed_stages.insert(stage_id,
failed_task.error);
+ }
+ Some(FailedReason::FetchPartitionError(
+ fetch_partiton_error,
+ )) if failed_stages.is_empty()
+ && current_running_stages.contains(
+ &(fetch_partiton_error.map_stage_id as
usize),
+ )
+ && !unsolved_stage
+ .last_attempt_failure_reasons
+
.contains(&fetch_partiton_error.executor_id) =>
+ {
+ should_ignore = false;
+ unsolved_stage
+ .last_attempt_failure_reasons
+
.insert(fetch_partiton_error.executor_id.clone());
+ let map_stage_id =
+ fetch_partiton_error.map_stage_id as
usize;
+ let map_partition_id =
+ fetch_partiton_error.map_partition_id
as usize;
+ let executor_id =
fetch_partiton_error.executor_id;
+ let removed_map_partitions = unsolved_stage
+ .remove_input_partitions(
+ map_stage_id,
+ map_partition_id,
+ &executor_id,
+ )?;
+
+ let missing_inputs = reset_running_stages
+ .entry(map_stage_id)
+ .or_default();
+
missing_inputs.extend(removed_map_partitions);
+ warn!(
+ "Need to reset the current running
Stage {map_stage_id} due to late come FetchPartitionError from its parent stage
{stage_id} of task {task_identity}"
+ );
+
+ // If the previous other task updates had
already mark the map stage success, need to remove it.
+ if
successful_stages.contains(&map_stage_id) {
+
successful_stages.remove(&map_stage_id);
}
- Some(FailedReason::FetchPartitionError(
- fetch_partiton_error,
- )) if failed_stages.is_empty()
- && current_running_stages.contains(
-
&(fetch_partiton_error.map_stage_id as usize),
- )
- && !unsolved_stage
- .last_attempt_failure_reasons
- .contains(
-
&fetch_partiton_error.executor_id,
- ) =>
- {
- should_ignore = false;
- unsolved_stage
- .last_attempt_failure_reasons
- .insert(
-
fetch_partiton_error.executor_id.clone(),
- );
- let map_stage_id =
- fetch_partiton_error.map_stage_id
as usize;
- let map_partition_id =
fetch_partiton_error
- .map_partition_id
- as usize;
- let executor_id =
- fetch_partiton_error.executor_id;
- let removed_map_partitions =
unsolved_stage
- .remove_input_partitions(
- map_stage_id,
- map_partition_id,
- &executor_id,
- )?;
-
- let missing_inputs =
reset_running_stages
- .entry(map_stage_id)
- .or_default();
-
missing_inputs.extend(removed_map_partitions);
- warn!("Need to reset the current
running Stage {map_stage_id} due to late come FetchPartitionError from its
parent stage {stage_id} of task {task_identity}");
-
- // If the previous other task updates
had already mark the map stage success, need to remove it.
- if
successful_stages.contains(&map_stage_id) {
-
successful_stages.remove(&map_stage_id);
- }
- if resolved_stages.contains(&stage_id)
{
- resolved_stages.remove(&stage_id);
- }
+ if resolved_stages.contains(&stage_id) {
+ resolved_stages.remove(&stage_id);
}
- _ => {}
}
+ _ => {}
}
}
if should_ignore {
- warn!("Ignore TaskStatus update of task with TID
{task_identity} as the Stage {job_id}/{stage_id} is in UnResolved status");
+ warn!(
+ "Ignore TaskStatus update of task with TID
{task_identity} as the Stage {job_id}/{stage_id} is in UnResolved status"
+ );
}
}
} else {
@@ -568,7 +577,10 @@ impl ExecutionGraph {
"Stage {}/{} is not in running when updating the
status of tasks {:?}",
job_id,
stage_id,
- stage_task_statuses.into_iter().map(|task_status|
task_status.partition_id).collect::<Vec<_>>(),
+ stage_task_statuses
+ .into_iter()
+ .map(|task_status| task_status.partition_id)
+ .collect::<Vec<_>>(),
);
}
} else {
@@ -605,7 +617,8 @@ impl ExecutionGraph {
}
} else {
warn!(
- "Stage {job_id}/{stage_id} is not in Successful state
when try to resubmit this stage. ");
+ "Stage {job_id}/{stage_id} is not in Successful state
when try to resubmit this stage. "
+ );
}
} else {
return Err(BallistaError::Internal(format!(
@@ -628,7 +641,8 @@ impl ExecutionGraph {
}
} else {
warn!(
- "Stage {job_id}/{stage_id} is not in Running state
when try to reset the running task. ");
+ "Stage {job_id}/{stage_id} is not in Running state
when try to reset the running task. "
+ );
}
} else {
return Err(BallistaError::Internal(format!(
@@ -1324,8 +1338,15 @@ impl Debug for ExecutionGraph {
.map(|stage| format!("{stage:?}"))
.collect::<Vec<String>>()
.join("");
- write!(f, "ExecutionGraph[job_id={}, session_id={},
available_tasks={}, is_successful={}]\n{}",
- self.job_id, self.session_id, self.available_tasks(),
self.is_successful(), stages)
+ write!(
+ f,
+ "ExecutionGraph[job_id={}, session_id={}, available_tasks={},
is_successful={}]\n{}",
+ self.job_id,
+ self.session_id,
+ self.available_tasks(),
+ self.is_successful(),
+ stages
+ )
}
}
@@ -1531,8 +1552,8 @@ mod test {
use crate::scheduler_server::event::QueryStageSchedulerEvent;
use ballista_core::error::Result;
use ballista_core::serde::protobuf::{
- self, failed_task, job_status, ExecutionError, FailedTask,
FetchPartitionError,
- IoError, JobStatus, TaskKilled,
+ self, ExecutionError, FailedTask, FetchPartitionError, IoError,
JobStatus,
+ TaskKilled, failed_task, job_status,
};
use crate::state::execution_graph::ExecutionGraph;
@@ -1887,7 +1908,9 @@ mod test {
assert_eq!(last_attempt, 3);
let failure_reason = format!("{:?}", agg_graph.status);
- assert!(failure_reason.contains("Task 1 in Stage 2 failed 4 times,
fail the stage, most recent failure reason"));
+ assert!(failure_reason.contains(
+ "Task 1 in Stage 2 failed 4 times, fail the stage, most recent
failure reason"
+ ));
assert!(failure_reason.contains("IOError"));
assert!(!agg_graph.is_successful());
diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs
b/ballista/scheduler/src/state/execution_graph_dot.rs
index e95a1b5a..49ef7a0a 100644
--- a/ballista/scheduler/src/state/execution_graph_dot.rs
+++ b/ballista/scheduler/src/state/execution_graph_dot.rs
@@ -219,11 +219,11 @@ fn sanitize(str: &str, max_len: Option<usize>) -> String {
}
// truncate after translation because we know we only have ASCII chars at
this point
// so the slice is safe (not splitting unicode character bytes)
- if let Some(limit) = max_len {
- if sanitized.len() > limit {
- sanitized.truncate(limit);
- return sanitized + " ...";
- }
+ if let Some(limit) = max_len
+ && sanitized.len() > limit
+ {
+ sanitized.truncate(limit);
+ return sanitized + " ...";
}
sanitized
}
diff --git a/ballista/scheduler/src/state/execution_stage.rs
b/ballista/scheduler/src/state/execution_stage.rs
index 57e53467..e4324249 100644
--- a/ballista/scheduler/src/state/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_stage.rs
@@ -34,10 +34,10 @@ use log::{debug, warn};
use ballista_core::error::{BallistaError, Result};
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf::failed_task::FailedReason;
-use ballista_core::serde::protobuf::{task_status, RunningTask};
use ballista_core::serde::protobuf::{
FailedTask, OperatorMetricsSet, ResultLost, SuccessfulTask, TaskStatus,
};
+use ballista_core::serde::protobuf::{RunningTask, task_status};
use ballista_core::serde::scheduler::PartitionLocation;
use crate::display::DisplayableBallistaExecutionPlan;
@@ -302,7 +302,10 @@ impl UnresolvedStage {
stage_inputs.add_partition(partition);
}
} else {
- return Err(BallistaError::Internal(format!("Error adding input
partitions to stage {}, {} is not a valid child stage ID", self.stage_id,
stage_id)));
+ return Err(BallistaError::Internal(format!(
+ "Error adding input partitions to stage {}, {} is not a valid
child stage ID",
+ self.stage_id, stage_id
+ )));
}
Ok(())
@@ -333,7 +336,10 @@ impl UnresolvedStage {
stage_output.complete = false;
Ok(bad_map_partitions)
} else {
- Err(BallistaError::Internal(format!("Error remove input partition
for Stage {}, {} is not a valid child stage ID", self.stage_id,
input_stage_id)))
+ Err(BallistaError::Internal(format!(
+ "Error remove input partition for Stage {}, {} is not a valid
child stage ID",
+ self.stage_id, input_stage_id
+ )))
}
}
@@ -623,8 +629,10 @@ impl RunningStage {
let task_info = self.task_infos[partition_id].as_ref().unwrap();
let task_id = task_info.task_id;
if (status.task_id as usize) < task_id {
- warn!("Ignore TaskStatus update with TID {} because there is more
recent task attempt with TID {} running for partition {}",
- status.task_id, task_id, partition_id);
+ warn!(
+ "Ignore TaskStatus update with TID {} because there is more
recent task attempt with TID {} running for partition {}",
+ status.task_id, task_id, partition_id
+ );
return false;
}
let scheduled_time = task_info.scheduled_time;
@@ -667,8 +675,14 @@ impl RunningStage {
let new_metrics_set = if let Some(combined_metrics) = &mut
self.stage_metrics {
if metrics.len() != combined_metrics.len() {
- return Err(BallistaError::Internal(format!("Error updating
task metrics to stage {}, task metrics array size {} does not equal \
- with the stage metrics array size {} for task {}",
self.stage_id, metrics.len(), combined_metrics.len(), partition)));
+ return Err(BallistaError::Internal(format!(
+ "Error updating task metrics to stage {}, task metrics
array size {} does not equal \
+ with the stage metrics array size {} for task {}",
+ self.stage_id,
+ metrics.len(),
+ combined_metrics.len(),
+ partition
+ )));
}
let metrics_values_array = metrics
.into_iter()
@@ -776,7 +790,10 @@ impl RunningStage {
stage_output.complete = false;
Ok(bad_map_partitions)
} else {
- Err(BallistaError::Internal(format!("Error remove input partition
for Stage {}, {} is not a valid child stage ID", self.stage_id,
input_stage_id)))
+ Err(BallistaError::Internal(format!(
+ "Error remove input partition for Stage {}, {} is not a valid
child stage ID",
+ self.stage_id, input_stage_id
+ )))
}
}
}
diff --git a/ballista/scheduler/src/state/executor_manager.rs
b/ballista/scheduler/src/state/executor_manager.rs
index d5f7297b..26ffa02d 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -30,12 +30,12 @@ use crate::state::task_manager::JobInfoCache;
use ballista_core::extension::SessionConfigExt;
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
use ballista_core::serde::protobuf::{
- executor_status, CancelTasksParams, ExecutorHeartbeat, MultiTaskDefinition,
- RemoveJobDataParams, StopExecutorParams,
+ CancelTasksParams, ExecutorHeartbeat, MultiTaskDefinition,
RemoveJobDataParams,
+ StopExecutorParams, executor_status,
};
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
use ballista_core::utils::{
- create_grpc_client_connection, get_time_before, GrpcClientConfig,
+ GrpcClientConfig, create_grpc_client_connection, get_time_before,
};
use dashmap::DashMap;
use log::{debug, error, info, warn};
@@ -201,8 +201,8 @@ impl ExecutorManager {
.await
{
warn!(
- "Failed to call remove_job_data on
Executor {executor} due to {err:?}"
- )
+ "Failed to call remove_job_data on Executor
{executor} due to {err:?}"
+ )
}
});
} else {
@@ -248,8 +248,7 @@ impl ExecutorManager {
pub async fn save_executor_metadata(&self, metadata: ExecutorMetadata) ->
Result<()> {
trace!(
"save executor metadata {} with {} task slots (pull-based
registration)",
- metadata.id,
- metadata.specification.task_slots
+ metadata.id, metadata.specification.task_slots
);
self.cluster_state.save_executor_metadata(metadata).await
}
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index 76139272..d6a4ceef 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -40,8 +40,8 @@ use crate::config::SchedulerConfig;
use crate::state::execution_graph::TaskDescription;
use ballista_core::error::{BallistaError, Result};
use ballista_core::event_loop::EventSender;
-use ballista_core::serde::protobuf::TaskStatus;
use ballista_core::serde::BallistaCodec;
+use ballista_core::serde::protobuf::TaskStatus;
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::empty::EmptyExec;
@@ -197,13 +197,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
if_revive = true;
}
}
- if if_revive {
- if let Err(e) = sender
+ if if_revive
+ && let Err(e) = sender
.post_event(QueryStageSchedulerEvent::ReviveOffers)
.await
- {
- error!("Fail to send revive offers event due to {e:?}");
- }
+ {
+ error!("Fail to send revive offers event due to {e:?}");
}
});
@@ -229,12 +228,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
match self.task_manager.executor_lost(executor_id).await {
Ok(tasks) => {
- if !tasks.is_empty() {
- if let Err(e) =
+ if !tasks.is_empty()
+ && let Err(e) =
self.executor_manager.cancel_running_tasks(tasks).await
- {
- warn!("Fail to cancel running tasks due to {e:?}");
- }
+ {
+ warn!("Fail to cancel running tasks due to {e:?}");
}
}
Err(e) => {
@@ -308,7 +306,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
}
}
Err(e) => {
- error!("Failed to launch new task, could not get
executor metadata: {e}");
+ error!(
+ "Failed to launch new task, could not get executor
metadata: {e}"
+ );
false
}
};
diff --git a/ballista/scheduler/src/state/session_manager.rs
b/ballista/scheduler/src/state/session_manager.rs
index 7538aff6..eefa9e35 100644
--- a/ballista/scheduler/src/state/session_manager.rs
+++ b/ballista/scheduler/src/state/session_manager.rs
@@ -61,7 +61,9 @@ pub fn create_datafusion_context(
// should we disable catalog on the scheduler side
.with_round_robin_repartition(false);
- log::warn!("session manager will override
`datafusion.optimizer.enable_round_robin_repartition` to `false` ");
+ log::warn!(
+ "session manager will override
`datafusion.optimizer.enable_round_robin_repartition` to `false` "
+ );
session_builder(session_config)?
} else {
session_builder(session_config.clone())?
diff --git a/ballista/scheduler/src/state/task_manager.rs
b/ballista/scheduler/src/state/task_manager.rs
index e49c7b51..66376a7f 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -30,11 +30,11 @@ use datafusion::prelude::SessionConfig;
use rand::distr::Alphanumeric;
use crate::cluster::JobState;
+use ballista_core::serde::BallistaCodec;
use ballista_core::serde::protobuf::{
- job_status, JobStatus, MultiTaskDefinition, TaskDefinition, TaskId,
TaskStatus,
+ JobStatus, MultiTaskDefinition, TaskDefinition, TaskId, TaskStatus,
job_status,
};
use ballista_core::serde::scheduler::ExecutorMetadata;
-use ballista_core::serde::BallistaCodec;
use dashmap::DashMap;
use datafusion::physical_plan::ExecutionPlan;
@@ -42,7 +42,7 @@ use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec};
use datafusion_proto::protobuf::PhysicalPlanNode;
use log::{debug, error, info, trace, warn};
-use rand::{rng, Rng};
+use rand::{Rng, rng};
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::sync::Arc;
@@ -364,7 +364,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
)?
} else {
// TODO Deal with curator changed case
- error!("Fail to find job {job_id} in the active cache and it
may not be curated by this scheduler");
+ error!(
+ "Fail to find job {job_id} in the active cache and it may
not be curated by this scheduler"
+ );
vec![]
};
@@ -431,7 +433,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
(running_tasks, pending_tasks)
} else {
// TODO listen the job state update event and fix task cancelling
- warn!("Fail to find job {job_id} in the cache, unable to cancel
tasks for job, fail the job state only.");
+ warn!(
+ "Fail to find job {job_id} in the cache, unable to cancel
tasks for job, fail the job state only."
+ );
(vec![], 0)
};
@@ -587,7 +591,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
.iter()
.map(|task| task.partition.partition_id)
.collect();
- debug!("Preparing multi task definition for tasks {task_ids:?}
belonging to job stage {job_id}/{stage_id}");
+ debug!(
+ "Preparing multi task definition for tasks {task_ids:?}
belonging to job stage {job_id}/{stage_id}"
+ );
trace!("With task details {tasks:?}");
}
@@ -626,7 +632,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
Ok(multi_tasks)
} else {
- Err(BallistaError::General(format!("Cannot prepare multi task
definition for job {job_id} which is not in active cache")))
+ Err(BallistaError::General(format!(
+ "Cannot prepare multi task definition for job {job_id}
which is not in active cache"
+ )))
}
} else {
Err(BallistaError::General(
@@ -669,7 +677,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
/// Clean up a failed job in FailedJobs Keyspace by delayed
clean_up_interval seconds
pub(crate) fn clean_up_job_delayed(&self, job_id: String,
clean_up_interval: u64) {
if clean_up_interval == 0 {
- info!("The interval is 0 and the clean up for the failed job state
{job_id} will not triggered");
+ info!(
+ "The interval is 0 and the clean up for the failed job state
{job_id} will not triggered"
+ );
return;
}
diff --git a/ballista/scheduler/src/test_utils.rs
b/ballista/scheduler/src/test_utils.rs
index 5014a7e5..e30ec4d1 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -29,29 +29,29 @@ use async_trait::async_trait;
use crate::config::SchedulerConfig;
use crate::metrics::SchedulerMetricsCollector;
use crate::planner::DefaultDistributedPlanner;
-use crate::scheduler_server::{timestamp_millis, SchedulerServer};
+use crate::scheduler_server::{SchedulerServer, timestamp_millis};
use crate::state::executor_manager::ExecutorManager;
use crate::state::task_manager::TaskLauncher;
use ballista_core::serde::protobuf::job_status::Status;
use ballista_core::serde::protobuf::{
- task_status, FailedTask, JobStatus, MultiTaskDefinition,
ShuffleWritePartition,
- SuccessfulTask, TaskId, TaskStatus,
+ FailedTask, JobStatus, MultiTaskDefinition, ShuffleWritePartition,
SuccessfulTask,
+ TaskId, TaskStatus, task_status,
};
use ballista_core::serde::scheduler::{
ExecutorData, ExecutorMetadata, ExecutorSpecification,
};
-use ballista_core::serde::{protobuf, BallistaCodec};
+use ballista_core::serde::{BallistaCodec, protobuf};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::DataFusionError;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::execution::context::{SessionConfig, SessionContext};
use datafusion::functions_aggregate::{count::count, sum::sum};
use datafusion::logical_expr::{Expr, LogicalPlan, SortExpr};
-use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::{col, CsvReadOptions, JoinType};
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::prelude::{CsvReadOptions, JoinType, col};
use datafusion::test_util::scan_empty_with_partitions;
use crate::cluster::BallistaCluster;
@@ -61,7 +61,7 @@ use crate::state::execution_graph::{ExecutionGraph,
ExecutionStage, TaskDescript
use ballista_core::utils::{default_config_producer, default_session_builder};
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
use parking_lot::Mutex;
-use tokio::sync::mpsc::{channel, Receiver, Sender};
+use tokio::sync::mpsc::{Receiver, Sender, channel};
pub const TPCH_TABLES: &[&str] = &[
"part", "supplier", "partsupp", "customer", "orders", "lineitem",
"nation", "region",
@@ -552,7 +552,7 @@ impl SchedulerTest {
{
match inner {
Status::Failed(_) | Status::Successful(_) => {
- break Ok(status.unwrap())
+ break Ok(status.unwrap());
}
_ => {
if time >= timeout_ms {
@@ -587,7 +587,7 @@ impl SchedulerTest {
{
match inner {
Status::Failed(_) | Status::Successful(_) => {
- break Ok(status.unwrap())
+ break Ok(status.unwrap());
}
_ => continue,
}
@@ -641,7 +641,7 @@ impl SchedulerTest {
{
match inner {
Status::Failed(_) | Status::Successful(_) => {
- break Ok(status.unwrap())
+ break Ok(status.unwrap());
}
_ => continue,
}
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 9dbf4690..ee6502aa 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -19,7 +19,7 @@
name = "ballista-benchmarks"
description = "Ballista Benchmarks"
version = "50.0.0"
-edition = "2021"
+edition = "2024"
authors = ["Apache DataFusion <[email protected]>"]
homepage = "https://datafusion.apache.org/ballista/"
repository = "https://github.com/apache/datafusion-ballista"
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 86a4187e..d1401a65 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -26,19 +26,19 @@ use datafusion::common::{DEFAULT_CSV_EXTENSION,
DEFAULT_PARQUET_EXTENSION};
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
-use datafusion::execution::context::SessionState;
use datafusion::execution::SessionStateBuilder;
+use datafusion::execution::context::SessionState;
use datafusion::logical_expr::LogicalPlan;
-use datafusion::logical_expr::{expr::Cast, Expr};
+use datafusion::logical_expr::{Expr, expr::Cast};
use datafusion::parquet::basic::Compression;
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion::{
- arrow::datatypes::{DataType, Field, Schema},
- datasource::file_format::{csv::CsvFormat, FileFormat},
DATAFUSION_VERSION,
+ arrow::datatypes::{DataType, Field, Schema},
+ datasource::file_format::{FileFormat, csv::CsvFormat},
};
use datafusion::{
arrow::record_batch::RecordBatch,
datasource::file_format::parquet::ParquetFormat,
@@ -609,7 +609,7 @@ async fn register_tables(
other => {
return Err(DataFusionError::Plan(format!(
"Invalid file format '{other}'"
- )))
+ )));
}
}
}
@@ -646,7 +646,7 @@ fn get_query_sql(query: usize) -> Result<Vec<String>> {
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
- .collect())
+ .collect());
}
Err(e) => errors.push(format!("{filename}: {e}")),
};
@@ -771,7 +771,7 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
other => {
return Err(DataFusionError::NotImplemented(format!(
"Invalid compression format: {other}"
- )))
+ )));
}
};
let props = WriterProperties::builder()
@@ -782,7 +782,7 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
other => {
return Err(DataFusionError::NotImplemented(format!(
"Invalid output format: {other}"
- )))
+ )));
}
}
println!("Conversion completed in {} ms", start.elapsed().as_millis());
diff --git a/dev/msrvcheck/Cargo.toml b/dev/msrvcheck/Cargo.toml
index 9ab2e388..a7955f06 100644
--- a/dev/msrvcheck/Cargo.toml
+++ b/dev/msrvcheck/Cargo.toml
@@ -18,7 +18,7 @@
# MSRV checker for upstream DataFusion
[package]
name = "msrvcheck"
-edition = "2021"
+edition = "2024"
# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
diff --git a/dev/msrvcheck/src/main.rs b/dev/msrvcheck/src/main.rs
index 1773a7c9..0a15ddbe 100644
--- a/dev/msrvcheck/src/main.rs
+++ b/dev/msrvcheck/src/main.rs
@@ -56,7 +56,7 @@ fn main() -> CargoResult<()> {
for (package, version) in packages_with_rust_version {
if let Some(v) = version {
- if !v.is_compatible_with(project_msrv.as_partial()) {
+ if !project_msrv.is_compatible_with(v.as_partial()) {
panic!(
"package '{package}' has MSRV {v} not compatible with
current project MSRV {project_msrv}",
);
diff --git a/examples/examples/custom-executor.rs
b/examples/examples/custom-executor.rs
index 6e0e6c69..1f07bd76 100644
--- a/examples/examples/custom-executor.rs
+++ b/examples/examples/custom-executor.rs
@@ -20,7 +20,7 @@ use ballista_core::object_store::{
};
use ballista_executor::executor_process::{
- start_executor_process, ExecutorProcessConfig,
+ ExecutorProcessConfig, start_executor_process,
};
use std::sync::Arc;
///
diff --git a/examples/examples/remote-dataframe.rs
b/examples/examples/remote-dataframe.rs
index 53fb4adf..2837f0a8 100644
--- a/examples/examples/remote-dataframe.rs
+++ b/examples/examples/remote-dataframe.rs
@@ -20,7 +20,7 @@ use ballista_examples::test_util;
use datafusion::{
common::Result,
execution::SessionStateBuilder,
- prelude::{col, lit, ParquetReadOptions, SessionConfig, SessionContext},
+ prelude::{ParquetReadOptions, SessionConfig, SessionContext, col, lit},
};
/// This example demonstrates executing a simple query against an Arrow data
source (Parquet) and
diff --git a/examples/examples/standalone-sql.rs
b/examples/examples/standalone-sql.rs
index 6c957668..53cdf208 100644
--- a/examples/examples/standalone-sql.rs
+++ b/examples/examples/standalone-sql.rs
@@ -17,7 +17,7 @@
use ballista::datafusion::{
common::Result,
- execution::{options::ParquetReadOptions, SessionStateBuilder},
+ execution::{SessionStateBuilder, options::ParquetReadOptions},
prelude::{SessionConfig, SessionContext},
};
use ballista::prelude::{SessionConfigExt, SessionContextExt};
diff --git a/examples/tests/common/mod.rs b/examples/tests/common/mod.rs
index e9aabb58..2c2f65ca 100644
--- a/examples/tests/common/mod.rs
+++ b/examples/tests/common/mod.rs
@@ -17,7 +17,7 @@
use ballista::prelude::SessionConfigExt;
use ballista_core::serde::{
- protobuf::scheduler_grpc_client::SchedulerGrpcClient, BallistaCodec,
+ BallistaCodec, protobuf::scheduler_grpc_client::SchedulerGrpcClient,
};
use ballista_core::{ConfigProducer, RuntimeProducer};
use ballista_scheduler::SessionBuilder;
@@ -25,8 +25,8 @@ use datafusion::execution::SessionState;
use datafusion::prelude::SessionConfig;
use object_store::aws::AmazonS3Builder;
use testcontainers_modules::minio::MinIO;
-use testcontainers_modules::testcontainers::core::{CmdWaitFor, ExecCommand};
use testcontainers_modules::testcontainers::ContainerRequest;
+use testcontainers_modules::testcontainers::core::{CmdWaitFor, ExecCommand};
use testcontainers_modules::{minio, testcontainers::ImageExt};
pub const REGION: &str = "eu-west-1";
diff --git a/examples/tests/object_store.rs b/examples/tests/object_store.rs
index b45a08ee..b79712ea 100644
--- a/examples/tests/object_store.rs
+++ b/examples/tests/object_store.rs
@@ -221,11 +221,11 @@ mod custom_s3_config {
use crate::common::{ACCESS_KEY_ID, SECRET_KEY};
use ballista::extension::SessionContextExt;
use ballista::prelude::SessionConfigExt;
- use ballista_core::object_store::{CustomObjectStoreRegistry, S3Options};
use ballista_core::RuntimeProducer;
+ use ballista_core::object_store::{CustomObjectStoreRegistry, S3Options};
use ballista_examples::test_util::examples_test_data;
- use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::SessionState;
+ use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::prelude::SessionConfig;
use datafusion::{assert_batches_eq, prelude::SessionContext};
use datafusion::{error::DataFusionError, execution::SessionStateBuilder};
@@ -233,8 +233,8 @@ mod custom_s3_config {
use testcontainers_modules::testcontainers::runners::AsyncRunner;
#[tokio::test]
- async fn should_configure_s3_execute_sql_write_remote(
- ) -> datafusion::error::Result<()> {
+ async fn should_configure_s3_execute_sql_write_remote()
+ -> datafusion::error::Result<()> {
let test_data = examples_test_data();
//
@@ -378,8 +378,8 @@ mod custom_s3_config {
// SessionConfig propagation across ballista cluster.
#[tokio::test]
- async fn should_configure_s3_execute_sql_write_standalone(
- ) -> datafusion::error::Result<()> {
+ async fn should_configure_s3_execute_sql_write_standalone()
+ -> datafusion::error::Result<()> {
let test_data = examples_test_data();
//
diff --git a/rustfmt.toml b/rustfmt.toml
index b5292068..3b15ecf6 100644
--- a/rustfmt.toml
+++ b/rustfmt.toml
@@ -15,6 +15,6 @@
# specific language governing permissions and limitations
# under the License.
-edition = "2021"
+edition = "2024"
max_width = 90
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]