This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new bece30f0f feat: Make push scheduling policy default as it has lower
latency (#1461)
bece30f0f is described below
commit bece30f0fd781a8a48ebb7419e324a8827e0c04e
Author: Marko Milenković <[email protected]>
AuthorDate: Thu Feb 19 09:26:08 2026 +0000
feat: Make push scheduling policy default as it has lower latency (#1461)
* feat: make push scheduling policy default ...
... as it results lower latency on benchmarks
* Update ballista/executor/src/config.rs
Co-authored-by: jgrim <[email protected]>
* make standalone pull based
* reduce some of the sleeps
---------
Co-authored-by: jgrim <[email protected]>
---
ballista/core/src/config.rs | 2 +-
ballista/core/src/execution_plans/distributed_query.rs | 2 +-
ballista/executor/src/config.rs | 2 +-
ballista/executor/src/execution_loop.rs | 2 +-
ballista/executor/src/executor_process.rs | 3 ++-
ballista/scheduler/src/config.rs | 2 +-
ballista/scheduler/src/scheduler_server/grpc.rs | 6 ++++--
ballista/scheduler/src/standalone.rs | 4 +++-
8 files changed, 14 insertions(+), 9 deletions(-)
diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs
index 6448f0f53..15e031a16 100644
--- a/ballista/core/src/config.rs
+++ b/ballista/core/src/config.rs
@@ -463,9 +463,9 @@ impl datafusion::config::ConfigExtension for BallistaConfig
{
#[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))]
pub enum TaskSchedulingPolicy {
/// Pull-based scheduling works in a similar way to Apache Spark
- #[default]
PullStaged,
/// push-based scheduling can result in lower latency.
+ #[default]
PushStaged,
}
impl Display for TaskSchedulingPolicy {
diff --git a/ballista/core/src/execution_plans/distributed_query.rs
b/ballista/core/src/execution_plans/distributed_query.rs
index d7edac56b..8fb6935c4 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -366,7 +366,7 @@ async fn execute_query(
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
.into_inner();
let status = status.and_then(|s| s.status);
- let wait_future = tokio::time::sleep(Duration::from_millis(100));
+ let wait_future = tokio::time::sleep(Duration::from_millis(50));
let has_status_change = prev_status != status;
match status {
None => {
diff --git a/ballista/executor/src/config.rs b/ballista/executor/src/config.rs
index 2a58c9e92..56b8002da 100644
--- a/ballista/executor/src/config.rs
+++ b/ballista/executor/src/config.rs
@@ -78,7 +78,7 @@ pub struct Config {
)]
pub concurrent_tasks: usize,
/// Task scheduling policy: pull-staged (executor polls) or push-staged
(scheduler pushes).
- #[arg(short = 's', long, default_value_t =
ballista_core::config::TaskSchedulingPolicy::PullStaged, help = "The task
scheduling policy for the scheduler, possible values: pull-staged, push-staged.
Default: pull-staged")]
+ #[arg(short = 's', long, default_value_t =
ballista_core::config::TaskSchedulingPolicy::PushStaged, help = "The task
scheduling policy for the scheduler, possible values: pull-staged, push-staged.
Default: push-staged")]
pub task_scheduling_policy: ballista_core::config::TaskSchedulingPolicy,
/// Interval in seconds between job data cleanup runs (0 = disabled).
#[arg(
diff --git a/ballista/executor/src/execution_loop.rs
b/ballista/executor/src/execution_loop.rs
index 0d55793f2..fbf610f5c 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -203,7 +203,7 @@ where
}
if !active_job {
- tokio::time::sleep(Duration::from_millis(100)).await;
+ tokio::time::sleep(Duration::from_millis(50)).await;
}
}
}
diff --git a/ballista/executor/src/executor_process.rs
b/ballista/executor/src/executor_process.rs
index 702c8976b..ef57e2e7c 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -220,13 +220,14 @@ pub async fn start_executor_process(
} else {
opt.concurrent_tasks
};
-
+ let task_scheduling_policy = opt.task_scheduling_policy;
// assign this executor an unique ID
let executor_id = Uuid::new_v4().to_string();
info!("Executor starting ... (Datafusion Ballista {BALLISTA_VERSION})");
info!("Executor id: {executor_id}");
info!("Executor working directory: {work_dir}");
info!("Executor number of concurrent tasks: {concurrent_tasks}");
+ info!("Executor scheduling policy: {task_scheduling_policy:?}");
let executor_meta = ExecutorRegistration {
id: executor_id.clone(),
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 49d4b3f49..4f520aff5 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -68,7 +68,7 @@ pub struct Config {
#[arg(
short = 's',
long,
- default_value_t =
ballista_core::config::TaskSchedulingPolicy::PullStaged,
+ default_value_t =
ballista_core::config::TaskSchedulingPolicy::PushStaged,
help = "The scheduling policy for the scheduler, possible values:
pull-staged, push-staged. Default: pull-staged"
)]
pub scheduler_policy: ballista_core::config::TaskSchedulingPolicy,
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index f50ec7f1a..072b8a4d3 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -603,10 +603,12 @@ mod test {
use super::{SchedulerGrpc, SchedulerServer};
#[tokio::test]
- async fn test_poll_work() -> Result<(), BallistaError> {
+ async fn test_pull_work() -> Result<(), BallistaError> {
let cluster = test_cluster_context();
- let config = SchedulerConfig::default();
+ let config = SchedulerConfig::default().with_scheduler_policy(
+ ballista_core::config::TaskSchedulingPolicy::PullStaged,
+ );
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
diff --git a/ballista/scheduler/src/standalone.rs
b/ballista/scheduler/src/standalone.rs
index 075620192..553262d0c 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -90,7 +90,9 @@ pub async fn new_standalone_scheduler_with_builder(
"localhost:50050".to_owned(),
cluster,
codec,
- Arc::new(SchedulerConfig::default()),
+ Arc::new(SchedulerConfig::default().with_scheduler_policy(
+ ballista_core::config::TaskSchedulingPolicy::PullStaged,
+ )),
metrics_collector,
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]