This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new bece30f0f feat: Make push scheduling policy default as it has lower 
latency (#1461)
bece30f0f is described below

commit bece30f0fd781a8a48ebb7419e324a8827e0c04e
Author: Marko Milenković <[email protected]>
AuthorDate: Thu Feb 19 09:26:08 2026 +0000

    feat: Make push scheduling policy default as it has lower latency (#1461)
    
    * feat: make push scheduling policy default ...
    
    ... as it results lower latency on benchmarks
    
    * Update ballista/executor/src/config.rs
    
    Co-authored-by: jgrim <[email protected]>
    
    * make standalone pull based
    
    * reduce some of the sleeps
    
    ---------
    
    Co-authored-by: jgrim <[email protected]>
---
 ballista/core/src/config.rs                            | 2 +-
 ballista/core/src/execution_plans/distributed_query.rs | 2 +-
 ballista/executor/src/config.rs                        | 2 +-
 ballista/executor/src/execution_loop.rs                | 2 +-
 ballista/executor/src/executor_process.rs              | 3 ++-
 ballista/scheduler/src/config.rs                       | 2 +-
 ballista/scheduler/src/scheduler_server/grpc.rs        | 6 ++++--
 ballista/scheduler/src/standalone.rs                   | 4 +++-
 8 files changed, 14 insertions(+), 9 deletions(-)

diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs
index 6448f0f53..15e031a16 100644
--- a/ballista/core/src/config.rs
+++ b/ballista/core/src/config.rs
@@ -463,9 +463,9 @@ impl datafusion::config::ConfigExtension for BallistaConfig 
{
 #[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))]
 pub enum TaskSchedulingPolicy {
     /// Pull-based scheduling works in a similar way to Apache Spark
-    #[default]
     PullStaged,
     /// push-based scheduling can result in lower latency.
+    #[default]
     PushStaged,
 }
 impl Display for TaskSchedulingPolicy {
diff --git a/ballista/core/src/execution_plans/distributed_query.rs 
b/ballista/core/src/execution_plans/distributed_query.rs
index d7edac56b..8fb6935c4 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -366,7 +366,7 @@ async fn execute_query(
             .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
             .into_inner();
         let status = status.and_then(|s| s.status);
-        let wait_future = tokio::time::sleep(Duration::from_millis(100));
+        let wait_future = tokio::time::sleep(Duration::from_millis(50));
         let has_status_change = prev_status != status;
         match status {
             None => {
diff --git a/ballista/executor/src/config.rs b/ballista/executor/src/config.rs
index 2a58c9e92..56b8002da 100644
--- a/ballista/executor/src/config.rs
+++ b/ballista/executor/src/config.rs
@@ -78,7 +78,7 @@ pub struct Config {
     )]
     pub concurrent_tasks: usize,
     /// Task scheduling policy: pull-staged (executor polls) or push-staged 
(scheduler pushes).
-    #[arg(short = 's', long, default_value_t = 
ballista_core::config::TaskSchedulingPolicy::PullStaged, help = "The task 
scheduling policy for the scheduler, possible values: pull-staged, push-staged. 
Default: pull-staged")]
+    #[arg(short = 's', long, default_value_t = 
ballista_core::config::TaskSchedulingPolicy::PushStaged, help = "The task 
scheduling policy for the scheduler, possible values: pull-staged, push-staged. 
Default: push-staged")]
     pub task_scheduling_policy: ballista_core::config::TaskSchedulingPolicy,
     /// Interval in seconds between job data cleanup runs (0 = disabled).
     #[arg(
diff --git a/ballista/executor/src/execution_loop.rs 
b/ballista/executor/src/execution_loop.rs
index 0d55793f2..fbf610f5c 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -203,7 +203,7 @@ where
         }
 
         if !active_job {
-            tokio::time::sleep(Duration::from_millis(100)).await;
+            tokio::time::sleep(Duration::from_millis(50)).await;
         }
     }
 }
diff --git a/ballista/executor/src/executor_process.rs 
b/ballista/executor/src/executor_process.rs
index 702c8976b..ef57e2e7c 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -220,13 +220,14 @@ pub async fn start_executor_process(
     } else {
         opt.concurrent_tasks
     };
-
+    let task_scheduling_policy = opt.task_scheduling_policy;
     // assign this executor an unique ID
     let executor_id = Uuid::new_v4().to_string();
     info!("Executor starting ... (Datafusion Ballista {BALLISTA_VERSION})");
     info!("Executor id: {executor_id}");
     info!("Executor working directory: {work_dir}");
     info!("Executor number of concurrent tasks: {concurrent_tasks}");
+    info!("Executor scheduling policy: {task_scheduling_policy:?}");
 
     let executor_meta = ExecutorRegistration {
         id: executor_id.clone(),
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 49d4b3f49..4f520aff5 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -68,7 +68,7 @@ pub struct Config {
     #[arg(
         short = 's',
         long,
-        default_value_t = 
ballista_core::config::TaskSchedulingPolicy::PullStaged,
+        default_value_t = 
ballista_core::config::TaskSchedulingPolicy::PushStaged,
         help = "The scheduling policy for the scheduler, possible values: 
pull-staged, push-staged. Default: pull-staged"
     )]
     pub scheduler_policy: ballista_core::config::TaskSchedulingPolicy,
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index f50ec7f1a..072b8a4d3 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -603,10 +603,12 @@ mod test {
     use super::{SchedulerGrpc, SchedulerServer};
 
     #[tokio::test]
-    async fn test_poll_work() -> Result<(), BallistaError> {
+    async fn test_pull_work() -> Result<(), BallistaError> {
         let cluster = test_cluster_context();
 
-        let config = SchedulerConfig::default();
+        let config = SchedulerConfig::default().with_scheduler_policy(
+            ballista_core::config::TaskSchedulingPolicy::PullStaged,
+        );
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
                 "localhost:50050".to_owned(),
diff --git a/ballista/scheduler/src/standalone.rs 
b/ballista/scheduler/src/standalone.rs
index 075620192..553262d0c 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -90,7 +90,9 @@ pub async fn new_standalone_scheduler_with_builder(
             "localhost:50050".to_owned(),
             cluster,
             codec,
-            Arc::new(SchedulerConfig::default()),
+            Arc::new(SchedulerConfig::default().with_scheduler_policy(
+                ballista_core::config::TaskSchedulingPolicy::PullStaged,
+            )),
             metrics_collector,
         );
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to