This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f0f27c5 Make last_seen_ts_threshold for getting alive executor at the 
scheduler side larger than the heartbeat time interval (#786)
2f0f27c5 is described below

commit 2f0f27c56d46a14325dc5e3f3c041679c55022e7
Author: yahoNanJing <[email protected]>
AuthorDate: Tue May 30 10:02:01 2023 +0800

    Make last_seen_ts_threshold for getting alive executor at the scheduler 
side larger than the heartbeat time interval (#786)
    
    Co-authored-by: yangzhong <[email protected]>
---
 ballista/core/src/utils.rs                         | 13 +++-
 ballista/executor/executor_config_spec.toml        |  8 ++-
 ballista/executor/src/bin/main.rs                  |  1 +
 ballista/executor/src/executor_process.rs          | 14 ++--
 ballista/executor/src/executor_server.rs           | 10 ++-
 ballista/scheduler/scheduler_config_spec.toml      | 14 +++-
 ballista/scheduler/src/bin/main.rs                 |  5 +-
 ballista/scheduler/src/config.rs                   |  6 ++
 ballista/scheduler/src/scheduler_process.rs        |  3 +-
 ballista/scheduler/src/scheduler_server/grpc.rs    | 42 +++++------
 ballista/scheduler/src/scheduler_server/mod.rs     | 36 ++++------
 .../src/scheduler_server/query_stage_scheduler.rs  | 21 +++---
 ballista/scheduler/src/standalone.rs               |  3 +-
 ballista/scheduler/src/state/executor_manager.rs   | 83 ++++++++--------------
 ballista/scheduler/src/state/mod.rs                | 38 +++++-----
 ballista/scheduler/src/test_utils.rs               |  2 +-
 16 files changed, 152 insertions(+), 147 deletions(-)

diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 7c02ea02..f3859386 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -61,7 +61,7 @@ use std::io::{BufWriter, Write};
 use std::marker::PhantomData;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::time::Duration;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
 use std::{fs::File, pin::Pin};
 use tonic::codegen::StdError;
 use tonic::transport::{Channel, Error, Server};
@@ -472,3 +472,14 @@ pub fn collect_plan_metrics(plan: &dyn ExecutionPlan) -> 
Vec<MetricsSet> {
     });
     metrics_array
 }
+
+/// Given an interval in seconds, get the time in seconds before now
+pub fn get_time_before(interval_seconds: u64) -> u64 {
+    let now_epoch_ts = SystemTime::now()
+        .duration_since(UNIX_EPOCH)
+        .expect("Time went backwards");
+    now_epoch_ts
+        .checked_sub(Duration::from_secs(interval_seconds))
+        .unwrap_or_else(|| Duration::from_secs(0))
+        .as_secs()
+}
diff --git a/ballista/executor/executor_config_spec.toml 
b/ballista/executor/executor_config_spec.toml
index ba515d18..cd5f8535 100644
--- a/ballista/executor/executor_config_spec.toml
+++ b/ballista/executor/executor_config_spec.toml
@@ -130,4 +130,10 @@ default = "ballista_core::config::LogRotationPolicy::Daily"
 name = "grpc_server_max_decoding_message_size"
 type = "u32"
 default = "16777216"
-doc = "The maximum size of a decoded message at the grpc server side. Default: 
16MB"
\ No newline at end of file
+doc = "The maximum size of a decoded message at the grpc server side. Default: 
16MB"
+
+[[param]]
+name = "executor_heartbeat_interval_seconds"
+type = "u64"
+doc = "The heartbeat interval in seconds to the scheduler for push-based task 
scheduling"
+default = "60"
\ No newline at end of file
diff --git a/ballista/executor/src/bin/main.rs 
b/ballista/executor/src/bin/main.rs
index f5cca4c2..fe553473 100644
--- a/ballista/executor/src/bin/main.rs
+++ b/ballista/executor/src/bin/main.rs
@@ -79,6 +79,7 @@ async fn main() -> Result<()> {
         job_data_ttl_seconds: opt.job_data_ttl_seconds,
         job_data_clean_up_interval_seconds: 
opt.job_data_clean_up_interval_seconds,
         grpc_server_max_decoding_message_size: 
opt.grpc_server_max_decoding_message_size,
+        executor_heartbeat_interval_seconds: 
opt.executor_heartbeat_interval_seconds,
         execution_engine: None,
     };
 
diff --git a/ballista/executor/src/executor_process.rs 
b/ballista/executor/src/executor_process.rs
index 257506f8..97acf40c 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -20,7 +20,7 @@
 use std::net::SocketAddr;
 use std::sync::atomic::Ordering;
 use std::sync::Arc;
-use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
+use std::time::{Duration, Instant, UNIX_EPOCH};
 use std::{env, io};
 
 use anyhow::{Context, Result};
@@ -51,7 +51,8 @@ use ballista_core::serde::protobuf::{
 };
 use ballista_core::serde::BallistaCodec;
 use ballista_core::utils::{
-    create_grpc_client_connection, create_grpc_server, 
with_object_store_provider,
+    create_grpc_client_connection, create_grpc_server, get_time_before,
+    with_object_store_provider,
 };
 use ballista_core::BALLISTA_VERSION;
 
@@ -85,6 +86,7 @@ pub struct ExecutorProcessConfig {
     pub job_data_clean_up_interval_seconds: u64,
     /// The maximum size of a decoded message at the grpc server side.
     pub grpc_server_max_decoding_message_size: u32,
+    pub executor_heartbeat_interval_seconds: u64,
     /// Optional execution engine to use to execute physical plans, will 
default to
     /// DataFusion if none is provided.
     pub execution_engine: Option<Arc<dyn ExecutionEngine>>,
@@ -515,11 +517,7 @@ async fn clean_all_shuffle_data(work_dir: &str) -> 
Result<()> {
 /// Determines if a directory contains files newer than the cutoff time.
 /// If return true, it means the directory contains files newer than the 
cutoff time. It satisfy the ttl and should not be deleted.
 pub async fn satisfy_dir_ttl(dir: DirEntry, ttl_seconds: u64) -> Result<bool> {
-    let cutoff = SystemTime::now()
-        .duration_since(UNIX_EPOCH)
-        .expect("Time went backwards")
-        .checked_sub(Duration::from_secs(ttl_seconds))
-        .expect("The cut off time went backwards");
+    let cutoff = get_time_before(ttl_seconds);
 
     let mut to_check = vec![dir];
     while let Some(dir) = to_check.pop() {
@@ -530,6 +528,7 @@ pub async fn satisfy_dir_ttl(dir: DirEntry, ttl_seconds: 
u64) -> Result<bool> {
             .modified()?
             .duration_since(UNIX_EPOCH)
             .expect("Time went backwards")
+            .as_secs()
             > cutoff
         {
             return Ok(true);
@@ -544,6 +543,7 @@ pub async fn satisfy_dir_ttl(dir: DirEntry, ttl_seconds: 
u64) -> Result<bool> {
                 .modified()?
                 .duration_since(UNIX_EPOCH)
                 .expect("Time went backwards")
+                .as_secs()
                 > cutoff
             {
                 return Ok(true);
diff --git a/ballista/executor/src/executor_server.rs 
b/ballista/executor/src/executor_server.rs
index e04fe75e..9102923f 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -145,7 +145,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static 
+ AsExecutionPlan>(
     // 3. Start Heartbeater loop
     {
         let heartbeater = Heartbeater::new(executor_server.clone());
-        heartbeater.start(shutdown_noti);
+        heartbeater.start(shutdown_noti, 
config.executor_heartbeat_interval_seconds);
     }
 
     // 4. Start TaskRunnerPool loop
@@ -471,7 +471,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> Heartbeater<T, U>
         Self { executor_server }
     }
 
-    fn start(&self, shutdown_noti: &ShutdownNotifier) {
+    fn start(
+        &self,
+        shutdown_noti: &ShutdownNotifier,
+        executor_heartbeat_interval_seconds: u64,
+    ) {
         let executor_server = self.executor_server.clone();
         let mut heartbeat_shutdown = shutdown_noti.subscribe_for_shutdown();
         let heartbeat_complete = shutdown_noti.shutdown_complete_tx.clone();
@@ -481,7 +485,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> Heartbeater<T, U>
             while !heartbeat_shutdown.is_shutdown() {
                 executor_server.heartbeat().await;
                 tokio::select! {
-                    _ = tokio::time::sleep(Duration::from_millis(60000)) => {},
+                    _ = 
tokio::time::sleep(Duration::from_secs(executor_heartbeat_interval_seconds)) => 
{},
                     _ = heartbeat_shutdown.recv() => {
                         info!("Stop heartbeater");
                         drop(heartbeat_complete);
diff --git a/ballista/scheduler/scheduler_config_spec.toml 
b/ballista/scheduler/scheduler_config_spec.toml
index 8811f4db..1aac653a 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -157,4 +157,16 @@ doc = "The maximum expected processing time of a scheduler 
event (microseconds).
 name = "grpc_server_max_decoding_message_size"
 type = "u32"
 default = "16777216"
-doc = "The maximum size of a decoded message at the grpc server side. Default: 
16MB"
\ No newline at end of file
+doc = "The maximum size of a decoded message at the grpc server side. Default: 
16MB"
+
+[[param]]
+name = "executor_timeout_seconds"
+type = "u64"
+doc = "The executor timeout in seconds. It should be longer than executor's 
heartbeat intervals. Only after missing two or tree consecutive heartbeats from 
a executor, the executor is mark to be dead"
+default = "180"
+
+[[param]]
+name = "expire_dead_executor_interval_seconds"
+type = "u64"
+doc = "The interval to check expired or dead executors"
+default = "15"
\ No newline at end of file
diff --git a/ballista/scheduler/src/bin/main.rs 
b/ballista/scheduler/src/bin/main.rs
index 7c9ff11f..ec59b409 100644
--- a/ballista/scheduler/src/bin/main.rs
+++ b/ballista/scheduler/src/bin/main.rs
@@ -17,6 +17,7 @@
 
 //! Ballista Rust scheduler binary.
 
+use std::sync::Arc;
 use std::{env, io};
 
 use anyhow::Result;
@@ -139,10 +140,12 @@ async fn main() -> Result<()> {
         scheduler_event_expected_processing_duration: opt
             .scheduler_event_expected_processing_duration,
         grpc_server_max_decoding_message_size: 
opt.grpc_server_max_decoding_message_size,
+        executor_timeout_seconds: opt.executor_timeout_seconds,
+        expire_dead_executor_interval_seconds: 
opt.expire_dead_executor_interval_seconds,
     };
 
     let cluster = BallistaCluster::new_from_config(&config).await?;
 
-    start_server(cluster, addr, config).await?;
+    start_server(cluster, addr, Arc::new(config)).await?;
     Ok(())
 }
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index f5e33ff7..0cb44a73 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -56,6 +56,10 @@ pub struct SchedulerConfig {
     pub scheduler_event_expected_processing_duration: u64,
     /// The maximum size of a decoded message at the grpc server side.
     pub grpc_server_max_decoding_message_size: u32,
+    /// The executor timeout in seconds. It should be longer than executor's 
heartbeat intervals.
+    pub executor_timeout_seconds: u64,
+    /// The interval to check expired or dead executors
+    pub expire_dead_executor_interval_seconds: u64,
 }
 
 impl Default for SchedulerConfig {
@@ -75,6 +79,8 @@ impl Default for SchedulerConfig {
             executor_termination_grace_period: 0,
             scheduler_event_expected_processing_duration: 0,
             grpc_server_max_decoding_message_size: 16777216,
+            executor_timeout_seconds: 180,
+            expire_dead_executor_interval_seconds: 15,
         }
     }
 }
diff --git a/ballista/scheduler/src/scheduler_process.rs 
b/ballista/scheduler/src/scheduler_process.rs
index 9b7e220f..64ea3072 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -23,6 +23,7 @@ use hyper::{server::conn::AddrStream, 
service::make_service_fn, Server};
 use log::info;
 use std::convert::Infallible;
 use std::net::SocketAddr;
+use std::sync::Arc;
 use tonic::transport::server::Connected;
 use tower::Service;
 
@@ -44,7 +45,7 @@ use crate::scheduler_server::SchedulerServer;
 pub async fn start_server(
     cluster: BallistaCluster,
     addr: SocketAddr,
-    config: SchedulerConfig,
+    config: Arc<SchedulerConfig>,
 ) -> Result<()> {
     info!(
         "Ballista v{} Scheduler listening on {:?}",
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index dfef9809..28d3f9cc 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -497,7 +497,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
             event_sender,
             &executor_id,
             Some(reason),
-            self.executor_termination_grace_period,
+            self.config.executor_termination_grace_period,
         );
 
         Ok(Response::new(ExecutorStoppedResult {}))
@@ -554,7 +554,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
 
 #[cfg(all(test, feature = "sled"))]
 mod test {
-
+    use std::sync::Arc;
     use std::time::Duration;
 
     use datafusion_proto::protobuf::LogicalPlanNode;
@@ -572,7 +572,6 @@ mod test {
     use ballista_core::serde::scheduler::ExecutorSpecification;
     use ballista_core::serde::BallistaCodec;
 
-    use crate::state::executor_manager::DEFAULT_EXECUTOR_TIMEOUT_SECONDS;
     use crate::state::SchedulerState;
     use crate::test_utils::await_condition;
     use crate::test_utils::test_cluster_context;
@@ -583,12 +582,13 @@ mod test {
     async fn test_poll_work() -> Result<(), BallistaError> {
         let cluster = test_cluster_context();
 
+        let config = SchedulerConfig::default();
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
                 "localhost:50050".to_owned(),
                 cluster.clone(),
                 BallistaCodec::default(),
-                SchedulerConfig::default(),
+                Arc::new(config),
                 default_metrics_collector().unwrap(),
             );
         scheduler.init().await?;
@@ -669,12 +669,13 @@ mod test {
     async fn test_stop_executor() -> Result<(), BallistaError> {
         let cluster = test_cluster_context();
 
+        let config = 
SchedulerConfig::default().with_remove_executor_wait_secs(0);
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
                 "localhost:50050".to_owned(),
                 cluster.clone(),
                 BallistaCodec::default(),
-                SchedulerConfig::default().with_remove_executor_wait_secs(0),
+                Arc::new(config),
                 default_metrics_collector().unwrap(),
             );
         scheduler.init().await?;
@@ -740,14 +741,10 @@ mod test {
         // executor should be marked to dead
         assert!(is_stopped, "Executor not marked dead after 50ms");
 
-        let active_executors = state
-            .executor_manager
-            .get_alive_executors_within_one_minute();
+        let active_executors = state.executor_manager.get_alive_executors();
         assert!(active_executors.is_empty());
 
-        let expired_executors = state
-            .executor_manager
-            
.get_expired_executors(scheduler.executor_termination_grace_period);
+        let expired_executors = state.executor_manager.get_expired_executors();
         assert!(expired_executors.is_empty());
 
         Ok(())
@@ -757,12 +754,13 @@ mod test {
     async fn test_register_executor_in_heartbeat_service() -> Result<(), 
BallistaError> {
         let cluster = test_cluster_context();
 
+        let config = SchedulerConfig::default();
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
                 "localhost:50050".to_owned(),
                 cluster,
                 BallistaCodec::default(),
-                SchedulerConfig::default(),
+                Arc::new(config),
                 default_metrics_collector().unwrap(),
             );
         scheduler.init().await?;
@@ -809,12 +807,13 @@ mod test {
     async fn test_expired_executor() -> Result<(), BallistaError> {
         let cluster = test_cluster_context();
 
+        let config = SchedulerConfig::default();
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
                 "localhost:50050".to_owned(),
                 cluster.clone(),
                 BallistaCodec::default(),
-                SchedulerConfig::default(),
+                Arc::new(config),
                 default_metrics_collector().unwrap(),
             );
         scheduler.init().await?;
@@ -869,26 +868,23 @@ mod test {
             .expect("Received error response")
             .into_inner();
 
-        let active_executors = state
-            .executor_manager
-            .get_alive_executors_within_one_minute();
+        let active_executors = state.executor_manager.get_alive_executors();
         assert_eq!(active_executors.len(), 1);
 
-        let expired_executors = state
-            .executor_manager
-            
.get_expired_executors(scheduler.executor_termination_grace_period);
+        let expired_executors = state.executor_manager.get_expired_executors();
         assert!(expired_executors.is_empty());
 
         // simulate the heartbeat timeout
-        
tokio::time::sleep(Duration::from_secs(DEFAULT_EXECUTOR_TIMEOUT_SECONDS)).await;
+        tokio::time::sleep(Duration::from_secs(
+            scheduler.config.executor_timeout_seconds,
+        ))
+        .await;
         tokio::time::sleep(Duration::from_secs(3)).await;
 
         // executor should be marked to dead
         assert!(state.executor_manager.is_dead_executor("abc"));
 
-        let active_executors = state
-            .executor_manager
-            .get_alive_executors_within_one_minute();
+        let active_executors = state.executor_manager.get_alive_executors();
         assert!(active_executors.is_empty());
         Ok(())
     }
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs 
b/ballista/scheduler/src/scheduler_server/mod.rs
index bb0e9b85..86893d19 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -38,10 +38,7 @@ use log::{error, warn};
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
 
-use crate::state::executor_manager::{
-    ExecutorManager, ExecutorReservation, DEFAULT_EXECUTOR_TIMEOUT_SECONDS,
-    EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS,
-};
+use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
 
 use crate::state::task_manager::TaskLauncher;
 use crate::state::SchedulerState;
@@ -66,7 +63,7 @@ pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionP
     pub state: Arc<SchedulerState<T, U>>,
     pub(crate) query_stage_event_loop: EventLoop<QueryStageSchedulerEvent>,
     query_stage_scheduler: Arc<QueryStageScheduler<T, U>>,
-    executor_termination_grace_period: u64,
+    config: Arc<SchedulerConfig>,
 }
 
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> 
SchedulerServer<T, U> {
@@ -74,7 +71,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
         scheduler_name: String,
         cluster: BallistaCluster,
         codec: BallistaCodec<T, U>,
-        config: SchedulerConfig,
+        config: Arc<SchedulerConfig>,
         metrics_collector: Arc<dyn SchedulerMetricsCollector>,
     ) -> Self {
         let state = Arc::new(SchedulerState::new(
@@ -86,8 +83,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
         let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
             state.clone(),
             metrics_collector,
-            config.job_resubmit_interval_ms,
-            config.scheduler_event_expected_processing_duration,
+            config.clone(),
         ));
         let query_stage_event_loop = EventLoop::new(
             "query_stage".to_owned(),
@@ -101,7 +97,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
             state,
             query_stage_event_loop,
             query_stage_scheduler,
-            executor_termination_grace_period: 
config.executor_termination_grace_period,
+            config,
         }
     }
 
@@ -110,7 +106,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
         scheduler_name: String,
         cluster: BallistaCluster,
         codec: BallistaCodec<T, U>,
-        config: SchedulerConfig,
+        config: Arc<SchedulerConfig>,
         metrics_collector: Arc<dyn SchedulerMetricsCollector>,
         task_launcher: Arc<dyn TaskLauncher>,
     ) -> Self {
@@ -124,8 +120,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
         let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
             state.clone(),
             metrics_collector,
-            config.job_resubmit_interval_ms,
-            config.scheduler_event_expected_processing_duration,
+            config.clone(),
         ));
         let query_stage_event_loop = EventLoop::new(
             "query_stage".to_owned(),
@@ -139,7 +134,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
             state,
             query_stage_event_loop,
             query_stage_scheduler,
-            executor_termination_grace_period: 
config.executor_termination_grace_period,
+            config,
         }
     }
 
@@ -224,12 +219,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
     fn expire_dead_executors(&self) -> Result<()> {
         let state = self.state.clone();
         let event_sender = self.query_stage_event_loop.get_sender()?;
-        let termination_grace_period = self.executor_termination_grace_period;
         tokio::task::spawn(async move {
             loop {
-                let expired_executors = state
-                    .executor_manager
-                    .get_expired_executors(termination_grace_period);
+                let expired_executors = 
state.executor_manager.get_expired_executors();
                 for expired in expired_executors {
                     let executor_id = expired.executor_id.clone();
                     let executor_manager = state.executor_manager.clone();
@@ -246,11 +238,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
 
                     let stop_reason = if terminating {
                         format!(
-                        "TERMINATING executor {executor_id} heartbeat timed 
out after {termination_grace_period}s"
+                        "TERMINATING executor {executor_id} heartbeat timed 
out after {}s", state.config.executor_termination_grace_period,
                     )
                     } else {
                         format!(
-                            "ACTIVE executor {executor_id} heartbeat timed out 
after {DEFAULT_EXECUTOR_TIMEOUT_SECONDS}s",
+                            "ACTIVE executor {executor_id} heartbeat timed out 
after {}s",
+                            state.config.executor_timeout_seconds,
                         )
                     };
 
@@ -296,7 +289,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
                     }
                 }
                 tokio::time::sleep(Duration::from_secs(
-                    EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS,
+                    state.config.expire_dead_executor_interval_seconds,
                 ))
                 .await;
             }
@@ -687,12 +680,13 @@ mod test {
     ) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
         let cluster = test_cluster_context();
 
+        let config = 
SchedulerConfig::default().with_scheduler_policy(scheduling_policy);
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
                 "localhost:50050".to_owned(),
                 cluster,
                 BallistaCodec::default(),
-                
SchedulerConfig::default().with_scheduler_policy(scheduling_policy),
+                Arc::new(config),
                 Arc::new(TestMetricsCollector::default()),
             );
         scheduler.init().await?;
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs 
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index eb6f7504..1cd611b8 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -25,6 +25,7 @@ use log::{debug, error, info, warn};
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::event_loop::{EventAction, EventSender};
 
+use crate::config::SchedulerConfig;
 use crate::metrics::SchedulerMetricsCollector;
 use crate::scheduler_server::timestamp_millis;
 use datafusion_proto::logical_plan::AsLogicalPlan;
@@ -44,23 +45,20 @@ pub(crate) struct QueryStageScheduler<
     state: Arc<SchedulerState<T, U>>,
     metrics_collector: Arc<dyn SchedulerMetricsCollector>,
     pending_tasks: AtomicUsize,
-    job_resubmit_interval_ms: Option<u64>,
-    event_expected_processing_duration: u64,
+    config: Arc<SchedulerConfig>,
 }
 
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> 
QueryStageScheduler<T, U> {
     pub(crate) fn new(
         state: Arc<SchedulerState<T, U>>,
         metrics_collector: Arc<dyn SchedulerMetricsCollector>,
-        job_resubmit_interval_ms: Option<u64>,
-        event_expected_processing_duration: u64,
+        config: Arc<SchedulerConfig>,
     ) -> Self {
         Self {
             state,
             metrics_collector,
             pending_tasks: AtomicUsize::default(),
-            job_resubmit_interval_ms,
-            event_expected_processing_duration,
+            config,
         }
     }
 
@@ -98,7 +96,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
         _rx_event: &mpsc::Receiver<QueryStageSchedulerEvent>,
     ) -> Result<()> {
         let mut time_recorder = None;
-        if self.event_expected_processing_duration > 0 {
+        if self.config.scheduler_event_expected_processing_duration > 0 {
             time_recorder = Some((Instant::now(), event.clone()));
         };
         let tx_event = EventSender::new(tx_event.clone());
@@ -193,9 +191,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                         .map(|res| res.assign(job_id.clone()))
                         .collect();
 
-                    if reservations.is_empty() && 
self.job_resubmit_interval_ms.is_some()
+                    if reservations.is_empty()
+                        && self.config.job_resubmit_interval_ms.is_some()
                     {
-                        let wait_ms = self.job_resubmit_interval_ms.unwrap();
+                        let wait_ms = 
self.config.job_resubmit_interval_ms.unwrap();
 
                         debug!(
                             "No task slots reserved for job {job_id}, 
resubmitting after {wait_ms}ms"
@@ -377,8 +376,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
         }
         if let Some((start, ec)) = time_recorder {
             let duration = start.elapsed();
-            if duration.ge(&core::time::Duration::from_micros(
-                self.event_expected_processing_duration,
+            if duration.ge(&Duration::from_micros(
+                self.config.scheduler_event_expected_processing_duration,
             )) {
                 warn!(
                     "[METRICS] {:?} event cost {:?} us!",
diff --git a/ballista/scheduler/src/standalone.rs 
b/ballista/scheduler/src/standalone.rs
index 5334ae1d..cb74b2bf 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -29,6 +29,7 @@ use datafusion_proto::protobuf::LogicalPlanNode;
 use datafusion_proto::protobuf::PhysicalPlanNode;
 use log::info;
 use std::net::SocketAddr;
+use std::sync::Arc;
 use tokio::net::TcpListener;
 
 pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
@@ -46,7 +47,7 @@ pub async fn new_standalone_scheduler() -> Result<SocketAddr> 
{
             "localhost:50050".to_owned(),
             cluster,
             BallistaCodec::default(),
-            SchedulerConfig::default(),
+            Arc::new(SchedulerConfig::default()),
             metrics_collector,
         );
 
diff --git a/ballista/scheduler/src/state/executor_manager.rs 
b/ballista/scheduler/src/state/executor_manager.rs
index 63451014..7a6bf461 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use std::time::Duration;
 
 #[cfg(not(test))]
 use ballista_core::error::BallistaError;
@@ -23,7 +23,7 @@ use ballista_core::error::Result;
 use ballista_core::serde::protobuf;
 
 use crate::cluster::ClusterState;
-use crate::config::TaskDistribution;
+use crate::config::SchedulerConfig;
 
 use crate::state::execution_graph::RunningTaskInfo;
 use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
@@ -31,7 +31,7 @@ use ballista_core::serde::protobuf::{
     executor_status, CancelTasksParams, ExecutorHeartbeat, RemoveJobDataParams,
 };
 use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
-use ballista_core::utils::create_grpc_client_connection;
+use ballista_core::utils::{create_grpc_client_connection, get_time_before};
 use dashmap::DashMap;
 use log::{debug, error, info, warn};
 use std::collections::{HashMap, HashSet};
@@ -76,31 +76,21 @@ impl ExecutorReservation {
     }
 }
 
-// TODO move to configuration file
-/// Default executor timeout in seconds, it should be longer than executor's 
heartbeat intervals.
-/// Only after missing two or tree consecutive heartbeats from a executor, the 
executor is mark
-/// to be dead.
-pub const DEFAULT_EXECUTOR_TIMEOUT_SECONDS: u64 = 180;
-
-// TODO move to configuration file
-/// Interval check for expired or dead executors
-pub const EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS: u64 = 15;
-
 #[derive(Clone)]
 pub struct ExecutorManager {
-    task_distribution: TaskDistribution,
     cluster_state: Arc<dyn ClusterState>,
+    config: Arc<SchedulerConfig>,
     clients: ExecutorClients,
 }
 
 impl ExecutorManager {
     pub(crate) fn new(
         cluster_state: Arc<dyn ClusterState>,
-        task_distribution: TaskDistribution,
+        config: Arc<SchedulerConfig>,
     ) -> Self {
         Self {
-            task_distribution,
             cluster_state,
+            config,
             clients: Default::default(),
         }
     }
@@ -115,12 +105,12 @@ impl ExecutorManager {
     /// for scheduling.
     /// This operation is atomic, so if this method return an Err, no slots 
have been reserved.
     pub async fn reserve_slots(&self, n: u32) -> 
Result<Vec<ExecutorReservation>> {
-        let alive_executors = self.get_alive_executors_within_one_minute();
+        let alive_executors = self.get_alive_executors();
 
         debug!("Alive executors: {alive_executors:?}");
 
         self.cluster_state
-            .reserve_slots(n, self.task_distribution, Some(alive_executors))
+            .reserve_slots(n, self.config.task_distribution, 
Some(alive_executors))
             .await
     }
 
@@ -205,7 +195,7 @@ impl ExecutorManager {
 
     /// Send rpc to Executors to clean up the job data
     async fn clean_up_job_data_inner(&self, job_id: String) {
-        let alive_executors = self.get_alive_executors_within_one_minute();
+        let alive_executors = self.get_alive_executors();
         for executor in alive_executors {
             let job_id_clone = job_id.to_owned();
             if let Ok(mut client) = self.get_client(&executor).await {
@@ -391,10 +381,9 @@ impl ExecutorManager {
 
     /// Retrieve the set of all executor IDs where the executor has been 
observed in the last
     /// `last_seen_ts_threshold` seconds.
-    pub(crate) fn get_alive_executors(
-        &self,
-        last_seen_ts_threshold: u64,
-    ) -> HashSet<String> {
+    pub(crate) fn get_alive_executors(&self) -> HashSet<String> {
+        let last_seen_ts_threshold =
+            get_time_before(self.config.executor_timeout_seconds);
         self.cluster_state
             .executor_heartbeats()
             .iter()
@@ -414,24 +403,13 @@ impl ExecutorManager {
     }
 
     /// Return a list of expired executors
-    pub(crate) fn get_expired_executors(
-        &self,
-        termination_grace_period: u64,
-    ) -> Vec<ExecutorHeartbeat> {
-        let now_epoch_ts = SystemTime::now()
-            .duration_since(UNIX_EPOCH)
-            .expect("Time went backwards");
+    pub(crate) fn get_expired_executors(&self) -> Vec<ExecutorHeartbeat> {
         // Threshold for last heartbeat from Active executor before marking 
dead
-        let last_seen_threshold = now_epoch_ts
-            .checked_sub(Duration::from_secs(DEFAULT_EXECUTOR_TIMEOUT_SECONDS))
-            .unwrap_or_else(|| Duration::from_secs(0))
-            .as_secs();
+        let last_seen_threshold = 
get_time_before(self.config.executor_timeout_seconds);
 
         // Threshold for last heartbeat for Fenced executor before marking dead
-        let termination_wait_threshold = now_epoch_ts
-            .checked_sub(Duration::from_secs(termination_grace_period))
-            .unwrap_or_else(|| Duration::from_secs(0))
-            .as_secs();
+        let termination_wait_threshold =
+            get_time_before(self.config.executor_termination_grace_period);
 
         self.cluster_state
             .executor_heartbeats()
@@ -455,22 +433,12 @@ impl ExecutorManager {
             })
             .collect::<Vec<_>>()
     }
-
-    pub(crate) fn get_alive_executors_within_one_minute(&self) -> 
HashSet<String> {
-        let now_epoch_ts = SystemTime::now()
-            .duration_since(UNIX_EPOCH)
-            .expect("Time went backwards");
-        let last_seen_threshold = now_epoch_ts
-            .checked_sub(Duration::from_secs(60))
-            .unwrap_or_else(|| Duration::from_secs(0));
-        self.get_alive_executors(last_seen_threshold.as_secs())
-    }
 }
 
 #[cfg(test)]
 mod test {
-
-    use crate::config::TaskDistribution;
+    use crate::config::{SchedulerConfig, TaskDistribution};
+    use std::sync::Arc;
 
     use crate::scheduler_server::timestamp_secs;
     use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
@@ -495,8 +463,9 @@ mod test {
     ) -> Result<()> {
         let cluster = test_cluster_context();
 
+        let config = 
SchedulerConfig::default().with_task_distribution(task_distribution);
         let executor_manager =
-            ExecutorManager::new(cluster.cluster_state(), task_distribution);
+            ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
 
         let executors = test_executors(10, 4);
 
@@ -543,8 +512,9 @@ mod test {
     ) -> Result<()> {
         let cluster = test_cluster_context();
 
+        let config = 
SchedulerConfig::default().with_task_distribution(task_distribution);
         let executor_manager =
-            ExecutorManager::new(cluster.cluster_state(), task_distribution);
+            ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
 
         let executors = test_executors(10, 4);
 
@@ -598,9 +568,10 @@ mod test {
 
         let executors = test_executors(10, 4);
 
+        let config = 
SchedulerConfig::default().with_task_distribution(task_distribution);
         let cluster = test_cluster_context();
         let executor_manager =
-            ExecutorManager::new(cluster.cluster_state(), task_distribution);
+            ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
 
         for (executor_metadata, executor_data) in executors {
             executor_manager
@@ -646,8 +617,9 @@ mod test {
     ) -> Result<()> {
         let cluster = test_cluster_context();
 
+        let config = 
SchedulerConfig::default().with_task_distribution(task_distribution);
         let executor_manager =
-            ExecutorManager::new(cluster.cluster_state(), task_distribution);
+            ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
 
         let executors = test_executors(10, 4);
 
@@ -680,8 +652,9 @@ mod test {
     ) -> Result<()> {
         let cluster = test_cluster_context();
 
+        let config = 
SchedulerConfig::default().with_task_distribution(task_distribution);
         let executor_manager =
-            ExecutorManager::new(cluster.cluster_state(), task_distribution);
+            ExecutorManager::new(cluster.cluster_state(), Arc::new(config));
 
         // Setup two executors initially
         let executors = test_executors(2, 4);
diff --git a/ballista/scheduler/src/state/mod.rs 
b/ballista/scheduler/src/state/mod.rs
index c37dd280..2ad2d707 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -91,33 +91,20 @@ pub struct SchedulerState<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionPl
     pub task_manager: TaskManager<T, U>,
     pub session_manager: SessionManager,
     pub codec: BallistaCodec<T, U>,
-    pub config: SchedulerConfig,
+    pub config: Arc<SchedulerConfig>,
 }
 
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> 
SchedulerState<T, U> {
-    #[cfg(test)]
-    pub fn new_with_default_scheduler_name(
-        cluster: BallistaCluster,
-        codec: BallistaCodec<T, U>,
-    ) -> Self {
-        SchedulerState::new(
-            cluster,
-            codec,
-            "localhost:50050".to_owned(),
-            SchedulerConfig::default(),
-        )
-    }
-
     pub fn new(
         cluster: BallistaCluster,
         codec: BallistaCodec<T, U>,
         scheduler_name: String,
-        config: SchedulerConfig,
+        config: Arc<SchedulerConfig>,
     ) -> Self {
         Self {
             executor_manager: ExecutorManager::new(
                 cluster.cluster_state(),
-                config.task_distribution,
+                config.clone(),
             ),
             task_manager: TaskManager::new(
                 cluster.job_state(),
@@ -130,18 +117,27 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
         }
     }
 
+    #[cfg(test)]
+    pub fn new_with_default_scheduler_name(
+        cluster: BallistaCluster,
+        codec: BallistaCodec<T, U>,
+    ) -> Self {
+        let config = Arc::new(SchedulerConfig::default());
+        SchedulerState::new(cluster, codec, "localhost:50050".to_owned(), 
config)
+    }
+
     #[allow(dead_code)]
     pub(crate) fn new_with_task_launcher(
         cluster: BallistaCluster,
         codec: BallistaCodec<T, U>,
         scheduler_name: String,
-        config: SchedulerConfig,
+        config: Arc<SchedulerConfig>,
         dispatcher: Arc<dyn TaskLauncher>,
     ) -> Self {
         Self {
             executor_manager: ExecutorManager::new(
                 cluster.cluster_state(),
-                config.task_distribution,
+                config.clone(),
             ),
             task_manager: TaskManager::with_launcher(
                 cluster.job_state(),
@@ -469,12 +465,13 @@ mod test {
             .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
             .build()?;
 
+        let scheduler_config = SchedulerConfig::default();
         let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
             Arc::new(SchedulerState::new_with_task_launcher(
                 test_cluster_context(),
                 BallistaCodec::default(),
                 TEST_SCHEDULER_NAME.into(),
-                SchedulerConfig::default(),
+                Arc::new(scheduler_config),
                 Arc::new(BlackholeTaskLauncher::default()),
             ));
 
@@ -569,12 +566,13 @@ mod test {
             .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
             .build()?;
 
+        let scheduler_config = SchedulerConfig::default();
         let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
             Arc::new(SchedulerState::new_with_task_launcher(
                 test_cluster_context(),
                 BallistaCodec::default(),
                 TEST_SCHEDULER_NAME.into(),
-                SchedulerConfig::default(),
+                Arc::new(scheduler_config),
                 Arc::new(BlackholeTaskLauncher::default()),
             ));
 
diff --git a/ballista/scheduler/src/test_utils.rs 
b/ballista/scheduler/src/test_utils.rs
index 217c96c0..beaabca4 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -425,7 +425,7 @@ impl SchedulerTest {
                 "localhost:50050".to_owned(),
                 cluster,
                 BallistaCodec::default(),
-                config,
+                Arc::new(config),
                 metrics_collector,
                 Arc::new(launcher),
             );


Reply via email to