This is an automated email from the ASF dual-hosted git repository.
thinkharderdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 0b8d99c3 Handle job resubmission (#586)
0b8d99c3 is described below
commit 0b8d99c3077b28c0c7d2d96d38796dd9c5afaf02
Author: Dan Harris <[email protected]>
AuthorDate: Sat Feb 4 11:14:45 2023 +0200
Handle job resubmission (#586)
* Handle job resubmission
* Make resubmission configurable and add test
* Fix debug log
---
ballista/scheduler/scheduler_config_spec.toml | 6 ++
ballista/scheduler/src/bin/main.rs | 2 +
ballista/scheduler/src/config.rs | 9 ++
ballista/scheduler/src/scheduler_server/event.rs | 1 +
ballista/scheduler/src/scheduler_server/mod.rs | 28 ++++--
.../src/scheduler_server/query_stage_scheduler.rs | 110 ++++++++++++++++++---
ballista/scheduler/src/state/executor_manager.rs | 2 +-
ballista/scheduler/src/test_utils.rs | 36 +++++--
8 files changed, 166 insertions(+), 28 deletions(-)
diff --git a/ballista/scheduler/scheduler_config_spec.toml
b/ballista/scheduler/scheduler_config_spec.toml
index 0bb609cf..a0397771 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -142,3 +142,9 @@ name = "log_rotation_policy"
type = "ballista_core::config::LogRotationPolicy"
doc = "Tracing log rotation policy, possible values: minutely, hourly, daily,
never. Default: daily"
default = "ballista_core::config::LogRotationPolicy::Daily"
+
+[[param]]
+name = "job_resubmit_interval_ms"
+type = "u64"
+default = "0"
+doc = "If job is not able to be scheduled on submission, wait for this
interval and resubmit. Default value of 0 indicates that job shuuld not be
resubmitted"
diff --git a/ballista/scheduler/src/bin/main.rs
b/ballista/scheduler/src/bin/main.rs
index 62bfc083..b87bec71 100644
--- a/ballista/scheduler/src/bin/main.rs
+++ b/ballista/scheduler/src/bin/main.rs
@@ -129,6 +129,8 @@ async fn main() -> Result<()> {
finished_job_state_clean_up_interval_seconds: opt
.finished_job_state_clean_up_interval_seconds,
advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint,
+ job_resubmit_interval_ms: (opt.job_resubmit_interval_ms > 0)
+ .then_some(opt.job_resubmit_interval_ms),
};
start_server(scheduler_name, config_backend, cluster_state, addr,
config).await?;
Ok(())
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 97f79787..020d1c83 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -37,6 +37,9 @@ pub struct SchedulerConfig {
pub finished_job_state_clean_up_interval_seconds: u64,
/// The route endpoint for proxying flight sql results via scheduler
pub advertise_flight_sql_endpoint: Option<String>,
+ /// If provided, submitted jobs which do not have tasks scheduled will be
resubmitted after `job_resubmit_interval_ms`
+ /// milliseconds
+ pub job_resubmit_interval_ms: Option<u64>,
}
impl Default for SchedulerConfig {
@@ -48,6 +51,7 @@ impl Default for SchedulerConfig {
finished_job_data_clean_up_interval_seconds: 300,
finished_job_state_clean_up_interval_seconds: 3600,
advertise_flight_sql_endpoint: None,
+ job_resubmit_interval_ms: None,
}
}
}
@@ -95,6 +99,11 @@ impl SchedulerConfig {
self.executor_slots_policy = policy;
self
}
+
+ pub fn with_job_resubmit_interval_ms(mut self, interval_ms: u64) -> Self {
+ self.job_resubmit_interval_ms = Some(interval_ms);
+ self
+ }
}
// an enum used to configure the executor slots policy
diff --git a/ballista/scheduler/src/scheduler_server/event.rs
b/ballista/scheduler/src/scheduler_server/event.rs
index 74643535..21ce7fbb 100644
--- a/ballista/scheduler/src/scheduler_server/event.rs
+++ b/ballista/scheduler/src/scheduler_server/event.rs
@@ -37,6 +37,7 @@ pub enum QueryStageSchedulerEvent {
job_id: String,
queued_at: u64,
submitted_at: u64,
+ resubmit: bool,
},
// For a job which failed during planning
JobPlanningFailed {
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs
b/ballista/scheduler/src/scheduler_server/mod.rs
index 96f463a5..8966d92c 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -54,7 +54,7 @@ pub mod externalscaler {
pub mod event;
mod external_scaler;
mod grpc;
-mod query_stage_scheduler;
+pub(crate) mod query_stage_scheduler;
pub(crate) type SessionBuilder = fn(SessionConfig) -> SessionState;
@@ -84,8 +84,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
scheduler_name.clone(),
config.clone(),
));
- let query_stage_scheduler =
- Arc::new(QueryStageScheduler::new(state.clone(),
metrics_collector));
+ let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
+ state.clone(),
+ metrics_collector,
+ config.job_resubmit_interval_ms,
+ ));
let query_stage_event_loop = EventLoop::new(
"query_stage".to_owned(),
config.event_loop_buffer_size as usize,
@@ -118,8 +121,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
scheduler_name.clone(),
config.clone(),
));
- let query_stage_scheduler =
- Arc::new(QueryStageScheduler::new(state.clone(),
metrics_collector));
+ let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
+ state.clone(),
+ metrics_collector,
+ config.job_resubmit_interval_ms,
+ ));
let query_stage_event_loop = EventLoop::new(
"query_stage".to_owned(),
config.event_loop_buffer_size as usize,
@@ -154,8 +160,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
config.clone(),
task_launcher,
));
- let query_stage_scheduler =
- Arc::new(QueryStageScheduler::new(state.clone(),
metrics_collector));
+ let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
+ state.clone(),
+ metrics_collector,
+ config.job_resubmit_interval_ms,
+ ));
let query_stage_event_loop = EventLoop::new(
"query_stage".to_owned(),
config.event_loop_buffer_size as usize,
@@ -179,6 +188,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
Ok(())
}
+ #[cfg(test)]
+ pub(crate) fn query_stage_scheduler(&self) -> Arc<QueryStageScheduler<T,
U>> {
+ self.query_stage_scheduler.clone()
+ }
+
pub(crate) fn pending_tasks(&self) -> usize {
self.query_stage_scheduler.pending_tasks()
}
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 380fdf34..25454188 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -17,6 +17,7 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
+use std::time::Duration;
use async_trait::async_trait;
use log::{debug, error, info};
@@ -42,17 +43,20 @@ pub(crate) struct QueryStageScheduler<
state: Arc<SchedulerState<T, U>>,
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
pending_tasks: AtomicUsize,
+ job_resubmit_interval_ms: Option<u64>,
}
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
QueryStageScheduler<T, U> {
pub(crate) fn new(
state: Arc<SchedulerState<T, U>>,
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
+ job_resubmit_interval_ms: Option<u64>,
) -> Self {
Self {
state,
metrics_collector,
pending_tasks: AtomicUsize::default(),
+ job_resubmit_interval_ms,
}
}
@@ -119,6 +123,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
job_id,
queued_at,
submitted_at: timestamp_millis(),
+ resubmit: false,
}
};
tx_event
@@ -132,11 +137,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
job_id,
queued_at,
submitted_at,
+ resubmit,
} => {
- self.metrics_collector
- .record_submitted(&job_id, queued_at, submitted_at);
+ if !resubmit {
+ self.metrics_collector.record_submitted(
+ &job_id,
+ queued_at,
+ submitted_at,
+ );
+
+ info!("Job {} submitted", job_id);
+ } else {
+ debug!("Job {} resubmitted", job_id);
+ }
- info!("Job {} submitted", job_id);
if self.state.config.is_push_staged_scheduling() {
let available_tasks = self
.state
@@ -153,17 +167,42 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
.map(|res| res.assign(job_id.clone()))
.collect();
- debug!(
- "Reserved {} task slots for submitted job {}",
- reservations.len(),
- job_id
- );
+ if reservations.is_empty() &&
self.job_resubmit_interval_ms.is_some()
+ {
+ let wait_ms = self.job_resubmit_interval_ms.unwrap();
- tx_event
-
.post_event(QueryStageSchedulerEvent::ReservationOffering(
- reservations,
- ))
- .await?;
+ debug!(
+ "No task slots reserved for job {job_id},
resubmitting after {wait_ms}ms"
+ );
+
+ tokio::task::spawn(async move {
+
tokio::time::sleep(Duration::from_millis(wait_ms)).await;
+
+ if let Err(e) = tx_event
+
.post_event(QueryStageSchedulerEvent::JobSubmitted {
+ job_id,
+ queued_at,
+ submitted_at,
+ resubmit: true,
+ })
+ .await
+ {
+ error!("error resubmitting job: {}", e);
+ }
+ });
+ } else {
+ debug!(
+ "Reserved {} task slots for submitted job {}",
+ reservations.len(),
+ job_id
+ );
+
+ tx_event
+
.post_event(QueryStageSchedulerEvent::ReservationOffering(
+ reservations,
+ ))
+ .await?;
+ }
}
}
QueryStageSchedulerEvent::JobPlanningFailed {
@@ -314,15 +353,60 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
#[cfg(test)]
mod tests {
use crate::config::SchedulerConfig;
+ use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::test_utils::{await_condition, SchedulerTest,
TestMetricsCollector};
use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::error::Result;
+ use ballista_core::event_loop::EventAction;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::logical_expr::{col, sum, LogicalPlan};
use datafusion::test_util::scan_empty_with_partitions;
use std::sync::Arc;
use std::time::Duration;
+ #[tokio::test]
+ async fn test_job_resubmit() -> Result<()> {
+ let plan = test_plan(10);
+
+ let metrics_collector = Arc::new(TestMetricsCollector::default());
+
+ // Set resubmit interval of 1ms
+ let mut test = SchedulerTest::new(
+ SchedulerConfig::default()
+ .with_job_resubmit_interval_ms(1)
+ .with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
+ metrics_collector.clone(),
+ 0,
+ 0,
+ None,
+ )
+ .await?;
+
+ test.submit("job-id", "job-name", &plan).await?;
+
+ let query_stage_scheduler = test.query_stage_scheduler();
+
+ let (tx, mut rx) =
tokio::sync::mpsc::channel::<QueryStageSchedulerEvent>(10);
+
+ let event = QueryStageSchedulerEvent::JobSubmitted {
+ job_id: "job-id".to_string(),
+ queued_at: 0,
+ submitted_at: 0,
+ resubmit: false,
+ };
+
+ query_stage_scheduler.on_receive(event, &tx, &rx).await?;
+
+ let next_event = rx.recv().await.unwrap();
+
+ assert!(matches!(
+ next_event,
+ QueryStageSchedulerEvent::JobSubmitted { job_id, resubmit, .. } if
job_id == "job-id" && resubmit
+ ));
+
+ Ok(())
+ }
+
#[tokio::test]
async fn test_pending_task_metric() -> Result<()> {
let plan = test_plan(10);
diff --git a/ballista/scheduler/src/state/executor_manager.rs
b/ballista/scheduler/src/state/executor_manager.rs
index 8f0a810b..38c8da68 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -163,7 +163,7 @@ impl ExecutorManager {
} else {
let alive_executors = self.get_alive_executors_within_one_minute();
- println!("Alive executors: {alive_executors:?}");
+ debug!("Alive executors: {alive_executors:?}");
self.cluster_state
.reserve_slots(n, self.task_distribution,
Some(alive_executors))
diff --git a/ballista/scheduler/src/test_utils.rs
b/ballista/scheduler/src/test_utils.rs
index 68d78a75..d5465f00 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -51,6 +51,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::CsvReadOptions;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
use crate::state::backend::cluster::DefaultClusterState;
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
use parking_lot::Mutex;
@@ -384,13 +385,16 @@ impl SchedulerTest {
let state_storage = Arc::new(SledClient::try_new_temporary()?);
let cluster_state =
Arc::new(DefaultClusterState::new(state_storage.clone()));
- let ballista_config = BallistaConfig::builder()
- .set(
- BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
- format!("{}", num_executors *
task_slots_per_executor).as_str(),
- )
- .build()
- .expect("creating BallistaConfig");
+ let ballista_config = if num_executors > 0 && task_slots_per_executor
> 0 {
+ BallistaConfig::builder()
+ .set(
+ BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
+ format!("{}", num_executors *
task_slots_per_executor).as_str(),
+ )
+ .build()?
+ } else {
+ BallistaConfig::builder().build()?
+ };
let runner = runner.unwrap_or_else(|| Arc::new(default_task_runner()));
@@ -475,6 +479,7 @@ impl SchedulerTest {
job_name: &str,
plan: &LogicalPlan,
) -> Result<()> {
+ println!("{:?}", self.ballista_config);
let ctx = self
.scheduler
.state
@@ -489,6 +494,17 @@ impl SchedulerTest {
Ok(())
}
+ pub async fn post_scheduler_event(
+ &self,
+ event: QueryStageSchedulerEvent,
+ ) -> Result<()> {
+ self.scheduler
+ .query_stage_event_loop
+ .get_sender()?
+ .post_event(event)
+ .await
+ }
+
pub async fn tick(&mut self) -> Result<()> {
if let Some(receiver) = self.status_receiver.as_mut() {
if let Some((executor_id, status)) = receiver.recv().await {
@@ -596,6 +612,12 @@ impl SchedulerTest {
final_status
}
+
+ pub(crate) fn query_stage_scheduler(
+ &self,
+ ) -> Arc<QueryStageScheduler<LogicalPlanNode, PhysicalPlanNode>> {
+ self.scheduler.query_stage_scheduler()
+ }
}
#[derive(Clone)]