This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 2f0f27c5 Make last_seen_ts_threshold for getting alive executor at the
scheduler side larger than the heartbeat time interval (#786)
2f0f27c5 is described below
commit 2f0f27c56d46a14325dc5e3f3c041679c55022e7
Author: yahoNanJing <[email protected]>
AuthorDate: Tue May 30 10:02:01 2023 +0800
Make last_seen_ts_threshold for getting alive executor at the scheduler
side larger than the heartbeat time interval (#786)
Co-authored-by: yangzhong <[email protected]>
---
ballista/core/src/utils.rs | 13 +++-
ballista/executor/executor_config_spec.toml | 8 ++-
ballista/executor/src/bin/main.rs | 1 +
ballista/executor/src/executor_process.rs | 14 ++--
ballista/executor/src/executor_server.rs | 10 ++-
ballista/scheduler/scheduler_config_spec.toml | 14 +++-
ballista/scheduler/src/bin/main.rs | 5 +-
ballista/scheduler/src/config.rs | 6 ++
ballista/scheduler/src/scheduler_process.rs | 3 +-
ballista/scheduler/src/scheduler_server/grpc.rs | 42 +++++------
ballista/scheduler/src/scheduler_server/mod.rs | 36 ++++------
.../src/scheduler_server/query_stage_scheduler.rs | 21 +++---
ballista/scheduler/src/standalone.rs | 3 +-
ballista/scheduler/src/state/executor_manager.rs | 83 ++++++++--------------
ballista/scheduler/src/state/mod.rs | 38 +++++-----
ballista/scheduler/src/test_utils.rs | 2 +-
16 files changed, 152 insertions(+), 147 deletions(-)
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 7c02ea02..f3859386 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -61,7 +61,7 @@ use std::io::{BufWriter, Write};
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
-use std::time::Duration;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{fs::File, pin::Pin};
use tonic::codegen::StdError;
use tonic::transport::{Channel, Error, Server};
@@ -472,3 +472,14 @@ pub fn collect_plan_metrics(plan: &dyn ExecutionPlan) ->
Vec<MetricsSet> {
});
metrics_array
}
+
+/// Given an interval in seconds, get the time in seconds before now
+pub fn get_time_before(interval_seconds: u64) -> u64 {
+ let now_epoch_ts = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("Time went backwards");
+ now_epoch_ts
+ .checked_sub(Duration::from_secs(interval_seconds))
+ .unwrap_or_else(|| Duration::from_secs(0))
+ .as_secs()
+}
diff --git a/ballista/executor/executor_config_spec.toml
b/ballista/executor/executor_config_spec.toml
index ba515d18..cd5f8535 100644
--- a/ballista/executor/executor_config_spec.toml
+++ b/ballista/executor/executor_config_spec.toml
@@ -130,4 +130,10 @@ default = "ballista_core::config::LogRotationPolicy::Daily"
name = "grpc_server_max_decoding_message_size"
type = "u32"
default = "16777216"
-doc = "The maximum size of a decoded message at the grpc server side. Default:
16MB"
\ No newline at end of file
+doc = "The maximum size of a decoded message at the grpc server side. Default:
16MB"
+
+[[param]]
+name = "executor_heartbeat_interval_seconds"
+type = "u64"
+doc = "The heartbeat interval in seconds to the scheduler for push-based task
scheduling"
+default = "60"
\ No newline at end of file
diff --git a/ballista/executor/src/bin/main.rs
b/ballista/executor/src/bin/main.rs
index f5cca4c2..fe553473 100644
--- a/ballista/executor/src/bin/main.rs
+++ b/ballista/executor/src/bin/main.rs
@@ -79,6 +79,7 @@ async fn main() -> Result<()> {
job_data_ttl_seconds: opt.job_data_ttl_seconds,
job_data_clean_up_interval_seconds:
opt.job_data_clean_up_interval_seconds,
grpc_server_max_decoding_message_size:
opt.grpc_server_max_decoding_message_size,
+ executor_heartbeat_interval_seconds:
opt.executor_heartbeat_interval_seconds,
execution_engine: None,
};
diff --git a/ballista/executor/src/executor_process.rs
b/ballista/executor/src/executor_process.rs
index 257506f8..97acf40c 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -20,7 +20,7 @@
use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
-use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
+use std::time::{Duration, Instant, UNIX_EPOCH};
use std::{env, io};
use anyhow::{Context, Result};
@@ -51,7 +51,8 @@ use ballista_core::serde::protobuf::{
};
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::{
- create_grpc_client_connection, create_grpc_server,
with_object_store_provider,
+ create_grpc_client_connection, create_grpc_server, get_time_before,
+ with_object_store_provider,
};
use ballista_core::BALLISTA_VERSION;
@@ -85,6 +86,7 @@ pub struct ExecutorProcessConfig {
pub job_data_clean_up_interval_seconds: u64,
/// The maximum size of a decoded message at the grpc server side.
pub grpc_server_max_decoding_message_size: u32,
+ pub executor_heartbeat_interval_seconds: u64,
/// Optional execution engine to use to execute physical plans, will
default to
/// DataFusion if none is provided.
pub execution_engine: Option<Arc<dyn ExecutionEngine>>,
@@ -515,11 +517,7 @@ async fn clean_all_shuffle_data(work_dir: &str) ->
Result<()> {
/// Determines if a directory contains files newer than the cutoff time.
/// If return true, it means the directory contains files newer than the
cutoff time. It satisfy the ttl and should not be deleted.
pub async fn satisfy_dir_ttl(dir: DirEntry, ttl_seconds: u64) -> Result<bool> {
- let cutoff = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("Time went backwards")
- .checked_sub(Duration::from_secs(ttl_seconds))
- .expect("The cut off time went backwards");
+ let cutoff = get_time_before(ttl_seconds);
let mut to_check = vec![dir];
while let Some(dir) = to_check.pop() {
@@ -530,6 +528,7 @@ pub async fn satisfy_dir_ttl(dir: DirEntry, ttl_seconds:
u64) -> Result<bool> {
.modified()?
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
+ .as_secs()
> cutoff
{
return Ok(true);
@@ -544,6 +543,7 @@ pub async fn satisfy_dir_ttl(dir: DirEntry, ttl_seconds:
u64) -> Result<bool> {
.modified()?
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
+ .as_secs()
> cutoff
{
return Ok(true);
diff --git a/ballista/executor/src/executor_server.rs
b/ballista/executor/src/executor_server.rs
index e04fe75e..9102923f 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -145,7 +145,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static
+ AsExecutionPlan>(
// 3. Start Heartbeater loop
{
let heartbeater = Heartbeater::new(executor_server.clone());
- heartbeater.start(shutdown_noti);
+ heartbeater.start(shutdown_noti,
config.executor_heartbeat_interval_seconds);
}
// 4. Start TaskRunnerPool loop
@@ -471,7 +471,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> Heartbeater<T, U>
Self { executor_server }
}
- fn start(&self, shutdown_noti: &ShutdownNotifier) {
+ fn start(
+ &self,
+ shutdown_noti: &ShutdownNotifier,
+ executor_heartbeat_interval_seconds: u64,
+ ) {
let executor_server = self.executor_server.clone();
let mut heartbeat_shutdown = shutdown_noti.subscribe_for_shutdown();
let heartbeat_complete = shutdown_noti.shutdown_complete_tx.clone();
@@ -481,7 +485,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> Heartbeater<T, U>
while !heartbeat_shutdown.is_shutdown() {
executor_server.heartbeat().await;
tokio::select! {
- _ = tokio::time::sleep(Duration::from_millis(60000)) => {},
+ _ =
tokio::time::sleep(Duration::from_secs(executor_heartbeat_interval_seconds)) =>
{},
_ = heartbeat_shutdown.recv() => {
info!("Stop heartbeater");
drop(heartbeat_complete);
diff --git a/ballista/scheduler/scheduler_config_spec.toml
b/ballista/scheduler/scheduler_config_spec.toml
index 8811f4db..1aac653a 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -157,4 +157,16 @@ doc = "The maximum expected processing time of a scheduler
event (microseconds).
name = "grpc_server_max_decoding_message_size"
type = "u32"
default = "16777216"
-doc = "The maximum size of a decoded message at the grpc server side. Default:
16MB"
\ No newline at end of file
+doc = "The maximum size of a decoded message at the grpc server side. Default:
16MB"
+
+[[param]]
+name = "executor_timeout_seconds"
+type = "u64"
+doc = "The executor timeout in seconds. It should be longer than executor's
heartbeat intervals. Only after missing two or tree consecutive heartbeats from
a executor, the executor is mark to be dead"
+default = "180"
+
+[[param]]
+name = "expire_dead_executor_interval_seconds"
+type = "u64"
+doc = "The interval to check expired or dead executors"
+default = "15"
\ No newline at end of file
diff --git a/ballista/scheduler/src/bin/main.rs
b/ballista/scheduler/src/bin/main.rs
index 7c9ff11f..ec59b409 100644
--- a/ballista/scheduler/src/bin/main.rs
+++ b/ballista/scheduler/src/bin/main.rs
@@ -17,6 +17,7 @@
//! Ballista Rust scheduler binary.
+use std::sync::Arc;
use std::{env, io};
use anyhow::Result;
@@ -139,10 +140,12 @@ async fn main() -> Result<()> {
scheduler_event_expected_processing_duration: opt
.scheduler_event_expected_processing_duration,
grpc_server_max_decoding_message_size:
opt.grpc_server_max_decoding_message_size,
+ executor_timeout_seconds: opt.executor_timeout_seconds,
+ expire_dead_executor_interval_seconds:
opt.expire_dead_executor_interval_seconds,
};
let cluster = BallistaCluster::new_from_config(&config).await?;
- start_server(cluster, addr, config).await?;
+ start_server(cluster, addr, Arc::new(config)).await?;
Ok(())
}
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index f5e33ff7..0cb44a73 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -56,6 +56,10 @@ pub struct SchedulerConfig {
pub scheduler_event_expected_processing_duration: u64,
/// The maximum size of a decoded message at the grpc server side.
pub grpc_server_max_decoding_message_size: u32,
+ /// The executor timeout in seconds. It should be longer than executor's
heartbeat intervals.
+ pub executor_timeout_seconds: u64,
+ /// The interval to check expired or dead executors
+ pub expire_dead_executor_interval_seconds: u64,
}
impl Default for SchedulerConfig {
@@ -75,6 +79,8 @@ impl Default for SchedulerConfig {
executor_termination_grace_period: 0,
scheduler_event_expected_processing_duration: 0,
grpc_server_max_decoding_message_size: 16777216,
+ executor_timeout_seconds: 180,
+ expire_dead_executor_interval_seconds: 15,
}
}
}
diff --git a/ballista/scheduler/src/scheduler_process.rs
b/ballista/scheduler/src/scheduler_process.rs
index 9b7e220f..64ea3072 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -23,6 +23,7 @@ use hyper::{server::conn::AddrStream,
service::make_service_fn, Server};
use log::info;
use std::convert::Infallible;
use std::net::SocketAddr;
+use std::sync::Arc;
use tonic::transport::server::Connected;
use tower::Service;
@@ -44,7 +45,7 @@ use crate::scheduler_server::SchedulerServer;
pub async fn start_server(
cluster: BallistaCluster,
addr: SocketAddr,
- config: SchedulerConfig,
+ config: Arc<SchedulerConfig>,
) -> Result<()> {
info!(
"Ballista v{} Scheduler listening on {:?}",
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index dfef9809..28d3f9cc 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -497,7 +497,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
event_sender,
&executor_id,
Some(reason),
- self.executor_termination_grace_period,
+ self.config.executor_termination_grace_period,
);
Ok(Response::new(ExecutorStoppedResult {}))
@@ -554,7 +554,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
#[cfg(all(test, feature = "sled"))]
mod test {
-
+ use std::sync::Arc;
use std::time::Duration;
use datafusion_proto::protobuf::LogicalPlanNode;
@@ -572,7 +572,6 @@ mod test {
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::BallistaCodec;
- use crate::state::executor_manager::DEFAULT_EXECUTOR_TIMEOUT_SECONDS;
use crate::state::SchedulerState;
use crate::test_utils::await_condition;
use crate::test_utils::test_cluster_context;
@@ -583,12 +582,13 @@ mod test {
async fn test_poll_work() -> Result<(), BallistaError> {
let cluster = test_cluster_context();
+ let config = SchedulerConfig::default();
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
cluster.clone(),
BallistaCodec::default(),
- SchedulerConfig::default(),
+ Arc::new(config),
default_metrics_collector().unwrap(),
);
scheduler.init().await?;
@@ -669,12 +669,13 @@ mod test {
async fn test_stop_executor() -> Result<(), BallistaError> {
let cluster = test_cluster_context();
+ let config =
SchedulerConfig::default().with_remove_executor_wait_secs(0);
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
cluster.clone(),
BallistaCodec::default(),
- SchedulerConfig::default().with_remove_executor_wait_secs(0),
+ Arc::new(config),
default_metrics_collector().unwrap(),
);
scheduler.init().await?;
@@ -740,14 +741,10 @@ mod test {
// executor should be marked to dead
assert!(is_stopped, "Executor not marked dead after 50ms");
- let active_executors = state
- .executor_manager
- .get_alive_executors_within_one_minute();
+ let active_executors = state.executor_manager.get_alive_executors();
assert!(active_executors.is_empty());
- let expired_executors = state
- .executor_manager
-
.get_expired_executors(scheduler.executor_termination_grace_period);
+ let expired_executors = state.executor_manager.get_expired_executors();
assert!(expired_executors.is_empty());
Ok(())
@@ -757,12 +754,13 @@ mod test {
async fn test_register_executor_in_heartbeat_service() -> Result<(),
BallistaError> {
let cluster = test_cluster_context();
+ let config = SchedulerConfig::default();
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
cluster,
BallistaCodec::default(),
- SchedulerConfig::default(),
+ Arc::new(config),
default_metrics_collector().unwrap(),
);
scheduler.init().await?;
@@ -809,12 +807,13 @@ mod test {
async fn test_expired_executor() -> Result<(), BallistaError> {
let cluster = test_cluster_context();
+ let config = SchedulerConfig::default();
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
cluster.clone(),
BallistaCodec::default(),
- SchedulerConfig::default(),
+ Arc::new(config),
default_metrics_collector().unwrap(),
);
scheduler.init().await?;
@@ -869,26 +868,23 @@ mod test {
.expect("Received error response")
.into_inner();
- let active_executors = state
- .executor_manager
- .get_alive_executors_within_one_minute();
+ let active_executors = state.executor_manager.get_alive_executors();
assert_eq!(active_executors.len(), 1);
- let expired_executors = state
- .executor_manager
-
.get_expired_executors(scheduler.executor_termination_grace_period);
+ let expired_executors = state.executor_manager.get_expired_executors();
assert!(expired_executors.is_empty());
// simulate the heartbeat timeout
-
tokio::time::sleep(Duration::from_secs(DEFAULT_EXECUTOR_TIMEOUT_SECONDS)).await;
+ tokio::time::sleep(Duration::from_secs(
+ scheduler.config.executor_timeout_seconds,
+ ))
+ .await;
tokio::time::sleep(Duration::from_secs(3)).await;
// executor should be marked to dead
assert!(state.executor_manager.is_dead_executor("abc"));
- let active_executors = state
- .executor_manager
- .get_alive_executors_within_one_minute();
+ let active_executors = state.executor_manager.get_alive_executors();
assert!(active_executors.is_empty());
Ok(())
}
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs
b/ballista/scheduler/src/scheduler_server/mod.rs
index bb0e9b85..86893d19 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -38,10 +38,7 @@ use log::{error, warn};
use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
-use crate::state::executor_manager::{
- ExecutorManager, ExecutorReservation, DEFAULT_EXECUTOR_TIMEOUT_SECONDS,
- EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS,
-};
+use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
use crate::state::task_manager::TaskLauncher;
use crate::state::SchedulerState;
@@ -66,7 +63,7 @@ pub struct SchedulerServer<T: 'static + AsLogicalPlan, U:
'static + AsExecutionP
pub state: Arc<SchedulerState<T, U>>,
pub(crate) query_stage_event_loop: EventLoop<QueryStageSchedulerEvent>,
query_stage_scheduler: Arc<QueryStageScheduler<T, U>>,
- executor_termination_grace_period: u64,
+ config: Arc<SchedulerConfig>,
}
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
SchedulerServer<T, U> {
@@ -74,7 +71,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
scheduler_name: String,
cluster: BallistaCluster,
codec: BallistaCodec<T, U>,
- config: SchedulerConfig,
+ config: Arc<SchedulerConfig>,
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
) -> Self {
let state = Arc::new(SchedulerState::new(
@@ -86,8 +83,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
state.clone(),
metrics_collector,
- config.job_resubmit_interval_ms,
- config.scheduler_event_expected_processing_duration,
+ config.clone(),
));
let query_stage_event_loop = EventLoop::new(
"query_stage".to_owned(),
@@ -101,7 +97,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
state,
query_stage_event_loop,
query_stage_scheduler,
- executor_termination_grace_period:
config.executor_termination_grace_period,
+ config,
}
}
@@ -110,7 +106,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
scheduler_name: String,
cluster: BallistaCluster,
codec: BallistaCodec<T, U>,
- config: SchedulerConfig,
+ config: Arc<SchedulerConfig>,
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
task_launcher: Arc<dyn TaskLauncher>,
) -> Self {
@@ -124,8 +120,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
state.clone(),
metrics_collector,
- config.job_resubmit_interval_ms,
- config.scheduler_event_expected_processing_duration,
+ config.clone(),
));
let query_stage_event_loop = EventLoop::new(
"query_stage".to_owned(),
@@ -139,7 +134,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
state,
query_stage_event_loop,
query_stage_scheduler,
- executor_termination_grace_period:
config.executor_termination_grace_period,
+ config,
}
}
@@ -224,12 +219,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
fn expire_dead_executors(&self) -> Result<()> {
let state = self.state.clone();
let event_sender = self.query_stage_event_loop.get_sender()?;
- let termination_grace_period = self.executor_termination_grace_period;
tokio::task::spawn(async move {
loop {
- let expired_executors = state
- .executor_manager
- .get_expired_executors(termination_grace_period);
+ let expired_executors =
state.executor_manager.get_expired_executors();
for expired in expired_executors {
let executor_id = expired.executor_id.clone();
let executor_manager = state.executor_manager.clone();
@@ -246,11 +238,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
let stop_reason = if terminating {
format!(
- "TERMINATING executor {executor_id} heartbeat timed
out after {termination_grace_period}s"
+ "TERMINATING executor {executor_id} heartbeat timed
out after {}s", state.config.executor_termination_grace_period,
)
} else {
format!(
- "ACTIVE executor {executor_id} heartbeat timed out
after {DEFAULT_EXECUTOR_TIMEOUT_SECONDS}s",
+ "ACTIVE executor {executor_id} heartbeat timed out
after {}s",
+ state.config.executor_timeout_seconds,
)
};
@@ -296,7 +289,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
}
}
tokio::time::sleep(Duration::from_secs(
- EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS,
+ state.config.expire_dead_executor_interval_seconds,
))
.await;
}
@@ -687,12 +680,13 @@ mod test {
) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
let cluster = test_cluster_context();
+ let config =
SchedulerConfig::default().with_scheduler_policy(scheduling_policy);
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
cluster,
BallistaCodec::default(),
-
SchedulerConfig::default().with_scheduler_policy(scheduling_policy),
+ Arc::new(config),
Arc::new(TestMetricsCollector::default()),
);
scheduler.init().await?;
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index eb6f7504..1cd611b8 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -25,6 +25,7 @@ use log::{debug, error, info, warn};
use ballista_core::error::{BallistaError, Result};
use ballista_core::event_loop::{EventAction, EventSender};
+use crate::config::SchedulerConfig;
use crate::metrics::SchedulerMetricsCollector;
use crate::scheduler_server::timestamp_millis;
use datafusion_proto::logical_plan::AsLogicalPlan;
@@ -44,23 +45,20 @@ pub(crate) struct QueryStageScheduler<
state: Arc<SchedulerState<T, U>>,
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
pending_tasks: AtomicUsize,
- job_resubmit_interval_ms: Option<u64>,
- event_expected_processing_duration: u64,
+ config: Arc<SchedulerConfig>,
}
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
QueryStageScheduler<T, U> {
pub(crate) fn new(
state: Arc<SchedulerState<T, U>>,
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
- job_resubmit_interval_ms: Option<u64>,
- event_expected_processing_duration: u64,
+ config: Arc<SchedulerConfig>,
) -> Self {
Self {
state,
metrics_collector,
pending_tasks: AtomicUsize::default(),
- job_resubmit_interval_ms,
- event_expected_processing_duration,
+ config,
}
}
@@ -98,7 +96,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
_rx_event: &mpsc::Receiver<QueryStageSchedulerEvent>,
) -> Result<()> {
let mut time_recorder = None;
- if self.event_expected_processing_duration > 0 {
+ if self.config.scheduler_event_expected_processing_duration > 0 {
time_recorder = Some((Instant::now(), event.clone()));
};
let tx_event = EventSender::new(tx_event.clone());
@@ -193,9 +191,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
.map(|res| res.assign(job_id.clone()))
.collect();
- if reservations.is_empty() &&
self.job_resubmit_interval_ms.is_some()
+ if reservations.is_empty()
+ && self.config.job_resubmit_interval_ms.is_some()
{
- let wait_ms = self.job_resubmit_interval_ms.unwrap();
+ let wait_ms =
self.config.job_resubmit_interval_ms.unwrap();
debug!(
"No task slots reserved for job {job_id},
resubmitting after {wait_ms}ms"
@@ -377,8 +376,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
}
if let Some((start, ec)) = time_recorder {
let duration = start.elapsed();
- if duration.ge(&core::time::Duration::from_micros(
- self.event_expected_processing_duration,
+ if duration.ge(&Duration::from_micros(
+ self.config.scheduler_event_expected_processing_duration,
)) {
warn!(
"[METRICS] {:?} event cost {:?} us!",
diff --git a/ballista/scheduler/src/standalone.rs
b/ballista/scheduler/src/standalone.rs
index 5334ae1d..cb74b2bf 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -29,6 +29,7 @@ use datafusion_proto::protobuf::LogicalPlanNode;
use datafusion_proto::protobuf::PhysicalPlanNode;
use log::info;
use std::net::SocketAddr;
+use std::sync::Arc;
use tokio::net::TcpListener;
pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
@@ -46,7 +47,7 @@ pub async fn new_standalone_scheduler() -> Result<SocketAddr>
{
"localhost:50050".to_owned(),
cluster,
BallistaCodec::default(),
- SchedulerConfig::default(),
+ Arc::new(SchedulerConfig::default()),
metrics_collector,
);
diff --git a/ballista/scheduler/src/state/executor_manager.rs
b/ballista/scheduler/src/state/executor_manager.rs
index 63451014..7a6bf461 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use std::time::Duration;
#[cfg(not(test))]
use ballista_core::error::BallistaError;
@@ -23,7 +23,7 @@ use ballista_core::error::Result;
use ballista_core::serde::protobuf;
use crate::cluster::ClusterState;
-use crate::config::TaskDistribution;
+use crate::config::SchedulerConfig;
use crate::state::execution_graph::RunningTaskInfo;
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
@@ -31,7 +31,7 @@ use ballista_core::serde::protobuf::{
executor_status, CancelTasksParams, ExecutorHeartbeat, RemoveJobDataParams,
};
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
-use ballista_core::utils::create_grpc_client_connection;
+use ballista_core::utils::{create_grpc_client_connection, get_time_before};
use dashmap::DashMap;
use log::{debug, error, info, warn};
use std::collections::{HashMap, HashSet};
@@ -76,31 +76,21 @@ impl ExecutorReservation {
}
}
-// TODO move to configuration file
-/// Default executor timeout in seconds, it should be longer than executor's
heartbeat intervals.
-/// Only after missing two or tree consecutive heartbeats from a executor, the
executor is mark
-/// to be dead.
-pub const DEFAULT_EXECUTOR_TIMEOUT_SECONDS: u64 = 180;
-
-// TODO move to configuration file
-/// Interval check for expired or dead executors
-pub const EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS: u64 = 15;
-
#[derive(Clone)]
pub struct ExecutorManager {
- task_distribution: TaskDistribution,
cluster_state: Arc<dyn ClusterState>,
+ config: Arc<SchedulerConfig>,
clients: ExecutorClients,
}
impl ExecutorManager {
pub(crate) fn new(
cluster_state: Arc<dyn ClusterState>,
- task_distribution: TaskDistribution,
+ config: Arc<SchedulerConfig>,
) -> Self {
Self {
- task_distribution,
cluster_state,
+ config,
clients: Default::default(),
}
}
@@ -115,12 +105,12 @@ impl ExecutorManager {
/// for scheduling.
/// This operation is atomic, so if this method return an Err, no slots
have been reserved.
pub async fn reserve_slots(&self, n: u32) ->
Result<Vec<ExecutorReservation>> {
- let alive_executors = self.get_alive_executors_within_one_minute();
+ let alive_executors = self.get_alive_executors();
debug!("Alive executors: {alive_executors:?}");
self.cluster_state
- .reserve_slots(n, self.task_distribution, Some(alive_executors))
+ .reserve_slots(n, self.config.task_distribution,
Some(alive_executors))
.await
}
@@ -205,7 +195,7 @@ impl ExecutorManager {
/// Send rpc to Executors to clean up the job data
async fn clean_up_job_data_inner(&self, job_id: String) {
- let alive_executors = self.get_alive_executors_within_one_minute();
+ let alive_executors = self.get_alive_executors();
for executor in alive_executors {
let job_id_clone = job_id.to_owned();
if let Ok(mut client) = self.get_client(&executor).await {
@@ -391,10 +381,9 @@ impl ExecutorManager {
/// Retrieve the set of all executor IDs where the executor has been
observed in the last
/// `last_seen_ts_threshold` seconds.
- pub(crate) fn get_alive_executors(
- &self,
- last_seen_ts_threshold: u64,
- ) -> HashSet<String> {
+ pub(crate) fn get_alive_executors(&self) -> HashSet<String> {
+ let last_seen_ts_threshold =
+ get_time_before(self.config.executor_timeout_seconds);
self.cluster_state
.executor_heartbeats()
.iter()
@@ -414,24 +403,13 @@ impl ExecutorManager {
}
/// Return a list of expired executors
- pub(crate) fn get_expired_executors(
- &self,
- termination_grace_period: u64,
- ) -> Vec<ExecutorHeartbeat> {
- let now_epoch_ts = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("Time went backwards");
+ pub(crate) fn get_expired_executors(&self) -> Vec<ExecutorHeartbeat> {
// Threshold for last heartbeat from Active executor before marking
dead
- let last_seen_threshold = now_epoch_ts
- .checked_sub(Duration::from_secs(DEFAULT_EXECUTOR_TIMEOUT_SECONDS))
- .unwrap_or_else(|| Duration::from_secs(0))
- .as_secs();
+ let last_seen_threshold =
get_time_before(self.config.executor_timeout_seconds);
// Threshold for last heartbeat for Fenced executor before marking dead
- let termination_wait_threshold = now_epoch_ts
- .checked_sub(Duration::from_secs(termination_grace_period))
- .unwrap_or_else(|| Duration::from_secs(0))
- .as_secs();
+ let termination_wait_threshold =
+ get_time_before(self.config.executor_termination_grace_period);
self.cluster_state
.executor_heartbeats()
@@ -455,22 +433,12 @@ impl ExecutorManager {
})
.collect::<Vec<_>>()
}
-
- pub(crate) fn get_alive_executors_within_one_minute(&self) ->
HashSet<String> {
- let now_epoch_ts = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("Time went backwards");
- let last_seen_threshold = now_epoch_ts
- .checked_sub(Duration::from_secs(60))
- .unwrap_or_else(|| Duration::from_secs(0));
- self.get_alive_executors(last_seen_threshold.as_secs())
- }
}
#[cfg(test)]
mod test {
-
- use crate::config::TaskDistribution;
+ use crate::config::{SchedulerConfig, TaskDistribution};
+ use std::sync::Arc;
use crate::scheduler_server::timestamp_secs;
use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
@@ -495,8 +463,9 @@ mod test {
) -> Result<()> {
let cluster = test_cluster_context();
+ let config =
SchedulerConfig::default().with_task_distribution(task_distribution);
let executor_manager =
- ExecutorManager::new(cluster.cluster_state(), task_distribution);
+ ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
let executors = test_executors(10, 4);
@@ -543,8 +512,9 @@ mod test {
) -> Result<()> {
let cluster = test_cluster_context();
+ let config =
SchedulerConfig::default().with_task_distribution(task_distribution);
let executor_manager =
- ExecutorManager::new(cluster.cluster_state(), task_distribution);
+ ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
let executors = test_executors(10, 4);
@@ -598,9 +568,10 @@ mod test {
let executors = test_executors(10, 4);
+ let config =
SchedulerConfig::default().with_task_distribution(task_distribution);
let cluster = test_cluster_context();
let executor_manager =
- ExecutorManager::new(cluster.cluster_state(), task_distribution);
+ ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
for (executor_metadata, executor_data) in executors {
executor_manager
@@ -646,8 +617,9 @@ mod test {
) -> Result<()> {
let cluster = test_cluster_context();
+ let config =
SchedulerConfig::default().with_task_distribution(task_distribution);
let executor_manager =
- ExecutorManager::new(cluster.cluster_state(), task_distribution);
+ ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
let executors = test_executors(10, 4);
@@ -680,8 +652,9 @@ mod test {
) -> Result<()> {
let cluster = test_cluster_context();
+ let config =
SchedulerConfig::default().with_task_distribution(task_distribution);
let executor_manager =
- ExecutorManager::new(cluster.cluster_state(), task_distribution);
+ ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
// Setup two executors initially
let executors = test_executors(2, 4);
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index c37dd280..2ad2d707 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -91,33 +91,20 @@ pub struct SchedulerState<T: 'static + AsLogicalPlan, U:
'static + AsExecutionPl
pub task_manager: TaskManager<T, U>,
pub session_manager: SessionManager,
pub codec: BallistaCodec<T, U>,
- pub config: SchedulerConfig,
+ pub config: Arc<SchedulerConfig>,
}
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
SchedulerState<T, U> {
- #[cfg(test)]
- pub fn new_with_default_scheduler_name(
- cluster: BallistaCluster,
- codec: BallistaCodec<T, U>,
- ) -> Self {
- SchedulerState::new(
- cluster,
- codec,
- "localhost:50050".to_owned(),
- SchedulerConfig::default(),
- )
- }
-
pub fn new(
cluster: BallistaCluster,
codec: BallistaCodec<T, U>,
scheduler_name: String,
- config: SchedulerConfig,
+ config: Arc<SchedulerConfig>,
) -> Self {
Self {
executor_manager: ExecutorManager::new(
cluster.cluster_state(),
- config.task_distribution,
+ config.clone(),
),
task_manager: TaskManager::new(
cluster.job_state(),
@@ -130,18 +117,27 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
}
}
+ #[cfg(test)]
+ pub fn new_with_default_scheduler_name(
+ cluster: BallistaCluster,
+ codec: BallistaCodec<T, U>,
+ ) -> Self {
+ let config = Arc::new(SchedulerConfig::default());
+ SchedulerState::new(cluster, codec, "localhost:50050".to_owned(),
config)
+ }
+
#[allow(dead_code)]
pub(crate) fn new_with_task_launcher(
cluster: BallistaCluster,
codec: BallistaCodec<T, U>,
scheduler_name: String,
- config: SchedulerConfig,
+ config: Arc<SchedulerConfig>,
dispatcher: Arc<dyn TaskLauncher>,
) -> Self {
Self {
executor_manager: ExecutorManager::new(
cluster.cluster_state(),
- config.task_distribution,
+ config.clone(),
),
task_manager: TaskManager::with_launcher(
cluster.job_state(),
@@ -469,12 +465,13 @@ mod test {
.set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
.build()?;
+ let scheduler_config = SchedulerConfig::default();
let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
Arc::new(SchedulerState::new_with_task_launcher(
test_cluster_context(),
BallistaCodec::default(),
TEST_SCHEDULER_NAME.into(),
- SchedulerConfig::default(),
+ Arc::new(scheduler_config),
Arc::new(BlackholeTaskLauncher::default()),
));
@@ -569,12 +566,13 @@ mod test {
.set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
.build()?;
+ let scheduler_config = SchedulerConfig::default();
let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
Arc::new(SchedulerState::new_with_task_launcher(
test_cluster_context(),
BallistaCodec::default(),
TEST_SCHEDULER_NAME.into(),
- SchedulerConfig::default(),
+ Arc::new(scheduler_config),
Arc::new(BlackholeTaskLauncher::default()),
));
diff --git a/ballista/scheduler/src/test_utils.rs
b/ballista/scheduler/src/test_utils.rs
index 217c96c0..beaabca4 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -425,7 +425,7 @@ impl SchedulerTest {
"localhost:50050".to_owned(),
cluster,
BallistaCodec::default(),
- config,
+ Arc::new(config),
metrics_collector,
Arc::new(launcher),
);