This is an automated email from the ASF dual-hosted git repository.

thinkharderdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b8d99c3 Handle job resubmission (#586)
0b8d99c3 is described below

commit 0b8d99c3077b28c0c7d2d96d38796dd9c5afaf02
Author: Dan Harris <[email protected]>
AuthorDate: Sat Feb 4 11:14:45 2023 +0200

    Handle job resubmission (#586)
    
    * Handle job resubmission
    
    * Make resubmission configurable and add test
    
    * Fix debug log
---
 ballista/scheduler/scheduler_config_spec.toml      |   6 ++
 ballista/scheduler/src/bin/main.rs                 |   2 +
 ballista/scheduler/src/config.rs                   |   9 ++
 ballista/scheduler/src/scheduler_server/event.rs   |   1 +
 ballista/scheduler/src/scheduler_server/mod.rs     |  28 ++++--
 .../src/scheduler_server/query_stage_scheduler.rs  | 110 ++++++++++++++++++---
 ballista/scheduler/src/state/executor_manager.rs   |   2 +-
 ballista/scheduler/src/test_utils.rs               |  36 +++++--
 8 files changed, 166 insertions(+), 28 deletions(-)

diff --git a/ballista/scheduler/scheduler_config_spec.toml 
b/ballista/scheduler/scheduler_config_spec.toml
index 0bb609cf..a0397771 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -142,3 +142,9 @@ name = "log_rotation_policy"
 type = "ballista_core::config::LogRotationPolicy"
 doc = "Tracing log rotation policy, possible values: minutely, hourly, daily, 
never. Default: daily"
 default = "ballista_core::config::LogRotationPolicy::Daily"
+
+[[param]]
+name = "job_resubmit_interval_ms"
+type = "u64"
+default = "0"
+doc = "If job is not able to be scheduled on submission, wait for this 
interval and resubmit. Default value of 0 indicates that job shuuld not be 
resubmitted"
diff --git a/ballista/scheduler/src/bin/main.rs 
b/ballista/scheduler/src/bin/main.rs
index 62bfc083..b87bec71 100644
--- a/ballista/scheduler/src/bin/main.rs
+++ b/ballista/scheduler/src/bin/main.rs
@@ -129,6 +129,8 @@ async fn main() -> Result<()> {
         finished_job_state_clean_up_interval_seconds: opt
             .finished_job_state_clean_up_interval_seconds,
         advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint,
+        job_resubmit_interval_ms: (opt.job_resubmit_interval_ms > 0)
+            .then_some(opt.job_resubmit_interval_ms),
     };
     start_server(scheduler_name, config_backend, cluster_state, addr, 
config).await?;
     Ok(())
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 97f79787..020d1c83 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -37,6 +37,9 @@ pub struct SchedulerConfig {
     pub finished_job_state_clean_up_interval_seconds: u64,
     /// The route endpoint for proxying flight sql results via scheduler
     pub advertise_flight_sql_endpoint: Option<String>,
+    /// If provided, submitted jobs which do not have tasks scheduled will be 
resubmitted after `job_resubmit_interval_ms`
+    /// milliseconds
+    pub job_resubmit_interval_ms: Option<u64>,
 }
 
 impl Default for SchedulerConfig {
@@ -48,6 +51,7 @@ impl Default for SchedulerConfig {
             finished_job_data_clean_up_interval_seconds: 300,
             finished_job_state_clean_up_interval_seconds: 3600,
             advertise_flight_sql_endpoint: None,
+            job_resubmit_interval_ms: None,
         }
     }
 }
@@ -95,6 +99,11 @@ impl SchedulerConfig {
         self.executor_slots_policy = policy;
         self
     }
+
+    pub fn with_job_resubmit_interval_ms(mut self, interval_ms: u64) -> Self {
+        self.job_resubmit_interval_ms = Some(interval_ms);
+        self
+    }
 }
 
 // an enum used to configure the executor slots policy
diff --git a/ballista/scheduler/src/scheduler_server/event.rs 
b/ballista/scheduler/src/scheduler_server/event.rs
index 74643535..21ce7fbb 100644
--- a/ballista/scheduler/src/scheduler_server/event.rs
+++ b/ballista/scheduler/src/scheduler_server/event.rs
@@ -37,6 +37,7 @@ pub enum QueryStageSchedulerEvent {
         job_id: String,
         queued_at: u64,
         submitted_at: u64,
+        resubmit: bool,
     },
     // For a job which failed during planning
     JobPlanningFailed {
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs 
b/ballista/scheduler/src/scheduler_server/mod.rs
index 96f463a5..8966d92c 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -54,7 +54,7 @@ pub mod externalscaler {
 pub mod event;
 mod external_scaler;
 mod grpc;
-mod query_stage_scheduler;
+pub(crate) mod query_stage_scheduler;
 
 pub(crate) type SessionBuilder = fn(SessionConfig) -> SessionState;
 
@@ -84,8 +84,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
             scheduler_name.clone(),
             config.clone(),
         ));
-        let query_stage_scheduler =
-            Arc::new(QueryStageScheduler::new(state.clone(), 
metrics_collector));
+        let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
+            state.clone(),
+            metrics_collector,
+            config.job_resubmit_interval_ms,
+        ));
         let query_stage_event_loop = EventLoop::new(
             "query_stage".to_owned(),
             config.event_loop_buffer_size as usize,
@@ -118,8 +121,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
             scheduler_name.clone(),
             config.clone(),
         ));
-        let query_stage_scheduler =
-            Arc::new(QueryStageScheduler::new(state.clone(), 
metrics_collector));
+        let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
+            state.clone(),
+            metrics_collector,
+            config.job_resubmit_interval_ms,
+        ));
         let query_stage_event_loop = EventLoop::new(
             "query_stage".to_owned(),
             config.event_loop_buffer_size as usize,
@@ -154,8 +160,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
             config.clone(),
             task_launcher,
         ));
-        let query_stage_scheduler =
-            Arc::new(QueryStageScheduler::new(state.clone(), 
metrics_collector));
+        let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
+            state.clone(),
+            metrics_collector,
+            config.job_resubmit_interval_ms,
+        ));
         let query_stage_event_loop = EventLoop::new(
             "query_stage".to_owned(),
             config.event_loop_buffer_size as usize,
@@ -179,6 +188,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
         Ok(())
     }
 
+    #[cfg(test)]
+    pub(crate) fn query_stage_scheduler(&self) -> Arc<QueryStageScheduler<T, 
U>> {
+        self.query_stage_scheduler.clone()
+    }
+
     pub(crate) fn pending_tasks(&self) -> usize {
         self.query_stage_scheduler.pending_tasks()
     }
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs 
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 380fdf34..25454188 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -17,6 +17,7 @@
 
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
+use std::time::Duration;
 
 use async_trait::async_trait;
 use log::{debug, error, info};
@@ -42,17 +43,20 @@ pub(crate) struct QueryStageScheduler<
     state: Arc<SchedulerState<T, U>>,
     metrics_collector: Arc<dyn SchedulerMetricsCollector>,
     pending_tasks: AtomicUsize,
+    job_resubmit_interval_ms: Option<u64>,
 }
 
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> 
QueryStageScheduler<T, U> {
     pub(crate) fn new(
         state: Arc<SchedulerState<T, U>>,
         metrics_collector: Arc<dyn SchedulerMetricsCollector>,
+        job_resubmit_interval_ms: Option<u64>,
     ) -> Self {
         Self {
             state,
             metrics_collector,
             pending_tasks: AtomicUsize::default(),
+            job_resubmit_interval_ms,
         }
     }
 
@@ -119,6 +123,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                             job_id,
                             queued_at,
                             submitted_at: timestamp_millis(),
+                            resubmit: false,
                         }
                     };
                     tx_event
@@ -132,11 +137,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                 job_id,
                 queued_at,
                 submitted_at,
+                resubmit,
             } => {
-                self.metrics_collector
-                    .record_submitted(&job_id, queued_at, submitted_at);
+                if !resubmit {
+                    self.metrics_collector.record_submitted(
+                        &job_id,
+                        queued_at,
+                        submitted_at,
+                    );
+
+                    info!("Job {} submitted", job_id);
+                } else {
+                    debug!("Job {} resubmitted", job_id);
+                }
 
-                info!("Job {} submitted", job_id);
                 if self.state.config.is_push_staged_scheduling() {
                     let available_tasks = self
                         .state
@@ -153,17 +167,42 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                         .map(|res| res.assign(job_id.clone()))
                         .collect();
 
-                    debug!(
-                        "Reserved {} task slots for submitted job {}",
-                        reservations.len(),
-                        job_id
-                    );
+                    if reservations.is_empty() && 
self.job_resubmit_interval_ms.is_some()
+                    {
+                        let wait_ms = self.job_resubmit_interval_ms.unwrap();
 
-                    tx_event
-                        
.post_event(QueryStageSchedulerEvent::ReservationOffering(
-                            reservations,
-                        ))
-                        .await?;
+                        debug!(
+                            "No task slots reserved for job {job_id}, 
resubmitting after {wait_ms}ms"
+                        );
+
+                        tokio::task::spawn(async move {
+                            
tokio::time::sleep(Duration::from_millis(wait_ms)).await;
+
+                            if let Err(e) = tx_event
+                                
.post_event(QueryStageSchedulerEvent::JobSubmitted {
+                                    job_id,
+                                    queued_at,
+                                    submitted_at,
+                                    resubmit: true,
+                                })
+                                .await
+                            {
+                                error!("error resubmitting job: {}", e);
+                            }
+                        });
+                    } else {
+                        debug!(
+                            "Reserved {} task slots for submitted job {}",
+                            reservations.len(),
+                            job_id
+                        );
+
+                        tx_event
+                            
.post_event(QueryStageSchedulerEvent::ReservationOffering(
+                                reservations,
+                            ))
+                            .await?;
+                    }
                 }
             }
             QueryStageSchedulerEvent::JobPlanningFailed {
@@ -314,15 +353,60 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
 #[cfg(test)]
 mod tests {
     use crate::config::SchedulerConfig;
+    use crate::scheduler_server::event::QueryStageSchedulerEvent;
     use crate::test_utils::{await_condition, SchedulerTest, 
TestMetricsCollector};
     use ballista_core::config::TaskSchedulingPolicy;
     use ballista_core::error::Result;
+    use ballista_core::event_loop::EventAction;
     use datafusion::arrow::datatypes::{DataType, Field, Schema};
     use datafusion::logical_expr::{col, sum, LogicalPlan};
     use datafusion::test_util::scan_empty_with_partitions;
     use std::sync::Arc;
     use std::time::Duration;
 
+    #[tokio::test]
+    async fn test_job_resubmit() -> Result<()> {
+        let plan = test_plan(10);
+
+        let metrics_collector = Arc::new(TestMetricsCollector::default());
+
+        // Set resubmit interval of 1ms
+        let mut test = SchedulerTest::new(
+            SchedulerConfig::default()
+                .with_job_resubmit_interval_ms(1)
+                .with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
+            metrics_collector.clone(),
+            0,
+            0,
+            None,
+        )
+        .await?;
+
+        test.submit("job-id", "job-name", &plan).await?;
+
+        let query_stage_scheduler = test.query_stage_scheduler();
+
+        let (tx, mut rx) = 
tokio::sync::mpsc::channel::<QueryStageSchedulerEvent>(10);
+
+        let event = QueryStageSchedulerEvent::JobSubmitted {
+            job_id: "job-id".to_string(),
+            queued_at: 0,
+            submitted_at: 0,
+            resubmit: false,
+        };
+
+        query_stage_scheduler.on_receive(event, &tx, &rx).await?;
+
+        let next_event = rx.recv().await.unwrap();
+
+        assert!(matches!(
+            next_event,
+            QueryStageSchedulerEvent::JobSubmitted { job_id, resubmit, .. } if 
job_id == "job-id" && resubmit
+        ));
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_pending_task_metric() -> Result<()> {
         let plan = test_plan(10);
diff --git a/ballista/scheduler/src/state/executor_manager.rs 
b/ballista/scheduler/src/state/executor_manager.rs
index 8f0a810b..38c8da68 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -163,7 +163,7 @@ impl ExecutorManager {
         } else {
             let alive_executors = self.get_alive_executors_within_one_minute();
 
-            println!("Alive executors: {alive_executors:?}");
+            debug!("Alive executors: {alive_executors:?}");
 
             self.cluster_state
                 .reserve_slots(n, self.task_distribution, 
Some(alive_executors))
diff --git a/ballista/scheduler/src/test_utils.rs 
b/ballista/scheduler/src/test_utils.rs
index 68d78a75..d5465f00 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -51,6 +51,7 @@ use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::CsvReadOptions;
 
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
 use crate::state::backend::cluster::DefaultClusterState;
 use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
 use parking_lot::Mutex;
@@ -384,13 +385,16 @@ impl SchedulerTest {
         let state_storage = Arc::new(SledClient::try_new_temporary()?);
         let cluster_state = 
Arc::new(DefaultClusterState::new(state_storage.clone()));
 
-        let ballista_config = BallistaConfig::builder()
-            .set(
-                BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
-                format!("{}", num_executors * 
task_slots_per_executor).as_str(),
-            )
-            .build()
-            .expect("creating BallistaConfig");
+        let ballista_config = if num_executors > 0 && task_slots_per_executor 
> 0 {
+            BallistaConfig::builder()
+                .set(
+                    BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
+                    format!("{}", num_executors * 
task_slots_per_executor).as_str(),
+                )
+                .build()?
+        } else {
+            BallistaConfig::builder().build()?
+        };
 
         let runner = runner.unwrap_or_else(|| Arc::new(default_task_runner()));
 
@@ -475,6 +479,7 @@ impl SchedulerTest {
         job_name: &str,
         plan: &LogicalPlan,
     ) -> Result<()> {
+        println!("{:?}", self.ballista_config);
         let ctx = self
             .scheduler
             .state
@@ -489,6 +494,17 @@ impl SchedulerTest {
         Ok(())
     }
 
+    pub async fn post_scheduler_event(
+        &self,
+        event: QueryStageSchedulerEvent,
+    ) -> Result<()> {
+        self.scheduler
+            .query_stage_event_loop
+            .get_sender()?
+            .post_event(event)
+            .await
+    }
+
     pub async fn tick(&mut self) -> Result<()> {
         if let Some(receiver) = self.status_receiver.as_mut() {
             if let Some((executor_id, status)) = receiver.recv().await {
@@ -596,6 +612,12 @@ impl SchedulerTest {
 
         final_status
     }
+
+    pub(crate) fn query_stage_scheduler(
+        &self,
+    ) -> Arc<QueryStageScheduler<LogicalPlanNode, PhysicalPlanNode>> {
+        self.scheduler.query_stage_scheduler()
+    }
 }
 
 #[derive(Clone)]

Reply via email to