This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 917741ac feat: make gRPC timeout configurations user-configurable 
(#1337)
917741ac is described below

commit 917741ac76f747902adc17f826ae482589613cb0
Author: Yu-Chuan Hung <[email protected]>
AuthorDate: Sun Nov 2 04:55:59 2025 +0800

    feat: make gRPC timeout configurations user-configurable (#1337)
    
    * feat: make gRPC timeout configurations user-configurable
    
    - Extends PR #115 by making gRPC timeout values configurable instead of 
hard-coded.
    
    * refactor: introduce GrpcClientConfig and GrpcServerConfig
    
    - Add GrpcClientConfig and GrpcServerConfig structs
    - Update create_grpc_client_connection/create_grpc_server signatures
    - Move server configs to ExecutorProcessConfig
    - Add standalone prefix to keepalive_timeout config
    
    * Address PR review feedback for gRPC configuration
    
    - Simplify config test to query only ballista.job.name
    - Remove standalone-specific gRPC configurations
    - Implement From trait for GrpcClientConfig conversion
    - Add documentation to gRPC config structs
    - Inline GrpcServerConfig creation in executor and scheduler
    - Pass BallistaConfig to ExecutorManager for proper config propagation
    
    * refactor: simplify GrpcClientConfig usage with Into trait
    
    - Use Into trait conversion for GrpcClientConfig creation
    - Fix clippy needless_borrow warning in executor_manager
    
    * Cargo fmt
    
    * fix: remove unused GrpcClientConfig imports
---
 ballista/client/tests/context_checks.rs            |  13 ++-
 ballista/core/src/client.rs                        |  14 +--
 ballista/core/src/config.rs                        |  47 ++++++++-
 .../core/src/execution_plans/distributed_query.rs  |   6 +-
 ballista/core/src/utils.rs                         | 117 +++++++++++++++++++--
 ballista/executor/src/config.rs                    |   1 +
 ballista/executor/src/executor_process.rs          |  39 ++++---
 ballista/executor/src/executor_server.rs           |  10 +-
 ballista/executor/src/lib.rs                       |   8 +-
 ballista/executor/src/standalone.rs                |   4 +-
 ballista/scheduler/src/standalone.rs               |   3 +-
 ballista/scheduler/src/state/executor_manager.rs   |  42 ++++++--
 12 files changed, 250 insertions(+), 54 deletions(-)

diff --git a/ballista/client/tests/context_checks.rs 
b/ballista/client/tests/context_checks.rs
index ce834bf5..48895345 100644
--- a/ballista/client/tests/context_checks.rs
+++ b/ballista/client/tests/context_checks.rs
@@ -234,18 +234,17 @@ mod supported {
         assert!(ballista_config_extension.is_some());
 
         let result = ctx
-            .sql("select name, value from information_schema.df_settings where 
name like 'ballista.%' order by name limit 2")
+            .sql("select name, value from information_schema.df_settings where 
name = 'ballista.job.name'")
             .await?
             .collect()
             .await?;
 
         let expected = [
-            "+---------------------------------------+----------+",
-            "| name                                  | value    |",
-            "+---------------------------------------+----------+",
-            "| ballista.grpc_client_max_message_size | 16777216 |",
-            "| ballista.job.name                     |          |",
-            "+---------------------------------------+----------+",
+            "+-------------------+-------+",
+            "| name              | value |",
+            "+-------------------+-------+",
+            "| ballista.job.name |       |",
+            "+-------------------+-------+",
         ];
 
         assert_batches_eq!(expected, &result);
diff --git a/ballista/core/src/client.rs b/ballista/core/src/client.rs
index 6775dce0..1b354346 100644
--- a/ballista/core/src/client.rs
+++ b/ballista/core/src/client.rs
@@ -45,7 +45,7 @@ use datafusion::error::DataFusionError;
 use datafusion::error::Result;
 
 use crate::serde::protobuf;
-use crate::utils::create_grpc_client_connection;
+use crate::utils::{create_grpc_client_connection, GrpcClientConfig};
 use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
 use futures::{Stream, StreamExt};
 use log::{debug, warn};
@@ -71,15 +71,15 @@ impl BallistaClient {
         max_message_size: usize,
     ) -> BResult<Self> {
         let addr = format!("http://{host}:{port}";);
+        let grpc_config = GrpcClientConfig::default();
         debug!("BallistaClient connecting to {addr}");
-        let connection =
-            create_grpc_client_connection(addr.clone())
-                .await
-                .map_err(|e| {
-                    BallistaError::GrpcConnectionError(format!(
+        let connection = create_grpc_client_connection(addr.clone(), 
&grpc_config)
+            .await
+            .map_err(|e| {
+                BallistaError::GrpcConnectionError(format!(
                     "Error connecting to Ballista scheduler or executor at 
{addr}: {e:?}"
                 ))
-                })?;
+            })?;
         let flight_client = FlightServiceClient::new(connection)
             .max_decoding_message_size(max_message_size)
             .max_encoding_message_size(max_message_size);
diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs
index ff7766de..d862f582 100644
--- a/ballista/core/src/config.rs
+++ b/ballista/core/src/config.rs
@@ -39,6 +39,16 @@ pub const BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ: &str =
 pub const BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT: &str =
     "ballista.shuffle.remote_read_prefer_flight";
 
+// gRPC client timeout configurations
+pub const BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS: &str =
+    "ballista.grpc.client.connect_timeout_seconds";
+pub const BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS: &str =
+    "ballista.grpc.client.timeout_seconds";
+pub const BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS: &str =
+    "ballista.grpc.client.tcp_keepalive_seconds";
+pub const BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS: &str =
+    "ballista.grpc.client.http2_keepalive_interval_seconds";
+
 pub type ParseResult<T> = result::Result<T, String>;
 use std::sync::LazyLock;
 
@@ -48,8 +58,8 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> 
= LazyLock::new(||
                          "Sets the job name that will appear in the web user 
interface for any submitted jobs".to_string(),
                          DataType::Utf8, None),
         ConfigEntry::new(BALLISTA_STANDALONE_PARALLELISM.to_string(),
-                        "Standalone processing parallelism ".to_string(),
-                        DataType::UInt16, 
Some(std::thread::available_parallelism().map(|v| 
v.get()).unwrap_or(1).to_string())),
+                         "Standalone processing parallelism ".to_string(),
+                         DataType::UInt16, 
Some(std::thread::available_parallelism().map(|v| 
v.get()).unwrap_or(1).to_string())),
         ConfigEntry::new(BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE.to_string(),
                          "Configuration for max message size in gRPC 
clients".to_string(),
                          DataType::UInt64,
@@ -66,7 +76,22 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, 
ConfigEntry>> = LazyLock::new(||
                          "Forces the shuffle reader to use flight reader 
instead of block reader for remote read. Block reader usually has better 
performance and resource utilization".to_string(),
                          DataType::Boolean,
                          Some((false).to_string())),
-
+        
ConfigEntry::new(BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS.to_string(),
+                         "Connection timeout for gRPC client in 
seconds".to_string(),
+                         DataType::UInt64,
+                         Some((20).to_string())),
+        ConfigEntry::new(BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS.to_string(),
+                         "Request timeout for gRPC client in 
seconds".to_string(),
+                         DataType::UInt64,
+                         Some((20).to_string())),
+        
ConfigEntry::new(BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS.to_string(),
+                         "TCP keep-alive interval for gRPC client in 
seconds".to_string(),
+                         DataType::UInt64,
+                         Some((3600).to_string())),
+        
ConfigEntry::new(BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS.to_string(),
+                         "HTTP/2 keep-alive interval for gRPC client in 
seconds".to_string(),
+                         DataType::UInt64,
+                         Some((300).to_string()))
     ];
     entries
         .into_iter()
@@ -188,6 +213,22 @@ impl BallistaConfig {
         self.get_usize_setting(BALLISTA_SHUFFLE_READER_MAX_REQUESTS)
     }
 
+    pub fn default_grpc_client_connect_timeout_seconds(&self) -> usize {
+        self.get_usize_setting(BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS)
+    }
+
+    pub fn default_grpc_client_timeout_seconds(&self) -> usize {
+        self.get_usize_setting(BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS)
+    }
+
+    pub fn default_grpc_client_tcp_keepalive_seconds(&self) -> usize {
+        self.get_usize_setting(BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS)
+    }
+
+    pub fn default_grpc_client_http2_keepalive_interval_seconds(&self) -> 
usize {
+        
self.get_usize_setting(BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS)
+    }
+
     /// Forces the shuffle reader to always read partitions via the Arrow 
Flight client,
     /// even when partitions are local to the node.
     ///
diff --git a/ballista/core/src/execution_plans/distributed_query.rs 
b/ballista/core/src/execution_plans/distributed_query.rs
index eab3c0d0..aae084ad 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -23,7 +23,7 @@ use crate::serde::protobuf::{
     scheduler_grpc_client::SchedulerGrpcClient, ExecuteQueryParams, 
GetJobStatusParams,
     GetJobStatusResult, KeyValuePair, PartitionLocation,
 };
-use crate::utils::create_grpc_client_connection;
+use crate::utils::{create_grpc_client_connection, GrpcClientConfig};
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::arrow::error::ArrowError;
 use datafusion::arrow::record_batch::RecordBatch;
@@ -238,6 +238,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for 
DistributedQueryExec<T> {
                 self.session_id.clone(),
                 query,
                 self.config.default_grpc_client_max_message_size(),
+                GrpcClientConfig::from(&self.config),
             )
             .map_err(|e| ArrowError::ExternalError(Box::new(e))),
         )
@@ -274,10 +275,11 @@ async fn execute_query(
     session_id: String,
     query: ExecuteQueryParams,
     max_message_size: usize,
+    grpc_config: GrpcClientConfig,
 ) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
     info!("Connecting to Ballista scheduler at {scheduler_url}");
     // TODO reuse the scheduler to avoid connecting to the Ballista scheduler 
again and again
-    let connection = create_grpc_client_connection(scheduler_url)
+    let connection = create_grpc_client_connection(scheduler_url, &grpc_config)
         .await
         .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
 
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 1e113157..248dbbe0 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::config::BallistaConfig;
 use crate::error::{BallistaError, Result};
 use crate::extension::SessionConfigExt;
 use crate::serde::scheduler::PartitionStats;
@@ -36,6 +37,95 @@ use std::{fs::File, pin::Pin};
 use tonic::codegen::StdError;
 use tonic::transport::{Channel, Error, Server};
 
+/// Configuration for gRPC client connections.
+///
+/// This struct holds timeout and keep-alive settings that are applied
+/// when establishing gRPC connections from executors to schedulers or
+/// between distributed components.
+///
+/// # Examples
+///
+/// ```
+/// use ballista_core::config::BallistaConfig;
+/// use ballista_core::utils::GrpcClientConfig;
+///
+/// let ballista_config = BallistaConfig::default();
+/// let grpc_config = GrpcClientConfig::from(&ballista_config);
+/// ```
+#[derive(Debug, Clone)]
+pub struct GrpcClientConfig {
+    /// Connection timeout in seconds
+    pub connect_timeout_seconds: u64,
+    /// Request timeout in seconds
+    pub timeout_seconds: u64,
+    /// TCP keep-alive interval in seconds
+    pub tcp_keepalive_seconds: u64,
+    /// HTTP/2 keep-alive ping interval in seconds
+    pub http2_keepalive_interval_seconds: u64,
+}
+
+impl From<&BallistaConfig> for GrpcClientConfig {
+    fn from(config: &BallistaConfig) -> Self {
+        Self {
+            connect_timeout_seconds: 
config.default_grpc_client_connect_timeout_seconds()
+                as u64,
+            timeout_seconds: config.default_grpc_client_timeout_seconds() as 
u64,
+            tcp_keepalive_seconds: 
config.default_grpc_client_tcp_keepalive_seconds()
+                as u64,
+            http2_keepalive_interval_seconds: config
+                .default_grpc_client_http2_keepalive_interval_seconds()
+                as u64,
+        }
+    }
+}
+
+impl Default for GrpcClientConfig {
+    fn default() -> Self {
+        Self {
+            connect_timeout_seconds: 20,
+            timeout_seconds: 20,
+            tcp_keepalive_seconds: 3600,
+            http2_keepalive_interval_seconds: 300,
+        }
+    }
+}
+
+/// Configuration for gRPC server.
+///
+/// This struct holds timeout and keep-alive settings that are applied
+/// when creating gRPC servers in executors and schedulers.
+///
+/// # Examples
+///
+/// ```
+/// use ballista_core::utils::GrpcServerConfig;
+///
+/// let server_config = GrpcServerConfig::default();
+/// let server = ballista_core::utils::create_grpc_server(&server_config);
+/// ```
+#[derive(Debug, Clone)]
+pub struct GrpcServerConfig {
+    /// Request timeout in seconds
+    pub timeout_seconds: u64,
+    /// TCP keep-alive interval in seconds
+    pub tcp_keepalive_seconds: u64,
+    /// HTTP/2 keep-alive ping interval in seconds
+    pub http2_keepalive_interval_seconds: u64,
+    /// HTTP/2 keep-alive ping timeout in seconds
+    pub http2_keepalive_timeout_seconds: u64,
+}
+
+impl Default for GrpcServerConfig {
+    fn default() -> Self {
+        Self {
+            timeout_seconds: 20,
+            tcp_keepalive_seconds: 3600,
+            http2_keepalive_interval_seconds: 300,
+            http2_keepalive_timeout_seconds: 20,
+        }
+    }
+}
+
 /// Default session builder using the provided configuration
 pub fn default_session_builder(
     config: SessionConfig,
@@ -106,31 +196,40 @@ pub async fn collect_stream(
 
 pub async fn create_grpc_client_connection<D>(
     dst: D,
+    config: &GrpcClientConfig,
 ) -> std::result::Result<Channel, Error>
 where
     D: std::convert::TryInto<tonic::transport::Endpoint>,
     D::Error: Into<StdError>,
 {
     let endpoint = tonic::transport::Endpoint::new(dst)?
-        .connect_timeout(Duration::from_secs(20))
-        .timeout(Duration::from_secs(20))
+        .connect_timeout(Duration::from_secs(config.connect_timeout_seconds))
+        .timeout(Duration::from_secs(config.timeout_seconds))
         // Disable Nagle's Algorithm since we don't want packets to wait
         .tcp_nodelay(true)
-        .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
-        .http2_keep_alive_interval(Duration::from_secs(300))
+        .tcp_keepalive(Some(Duration::from_secs(config.tcp_keepalive_seconds)))
+        .http2_keep_alive_interval(Duration::from_secs(
+            config.http2_keepalive_interval_seconds,
+        ))
+        // Use a fixed timeout for keep-alive pings to keep configuration 
simple
+        // since this is a standalone configuration
         .keep_alive_timeout(Duration::from_secs(20))
         .keep_alive_while_idle(true);
     endpoint.connect().await
 }
 
-pub fn create_grpc_server() -> Server {
+pub fn create_grpc_server(config: &GrpcServerConfig) -> Server {
     Server::builder()
-        .timeout(Duration::from_secs(20))
+        .timeout(Duration::from_secs(config.timeout_seconds))
         // Disable Nagle's Algorithm since we don't want packets to wait
         .tcp_nodelay(true)
-        .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
-        .http2_keepalive_interval(Option::Some(Duration::from_secs(300)))
-        .http2_keepalive_timeout(Option::Some(Duration::from_secs(20)))
+        .tcp_keepalive(Some(Duration::from_secs(config.tcp_keepalive_seconds)))
+        .http2_keepalive_interval(Some(Duration::from_secs(
+            config.http2_keepalive_interval_seconds,
+        )))
+        .http2_keepalive_timeout(Some(Duration::from_secs(
+            config.http2_keepalive_timeout_seconds,
+        )))
 }
 
 pub fn collect_plan_metrics(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
diff --git a/ballista/executor/src/config.rs b/ballista/executor/src/config.rs
index 9fa2472b..7e486bd1 100644
--- a/ballista/executor/src/config.rs
+++ b/ballista/executor/src/config.rs
@@ -139,6 +139,7 @@ impl TryFrom<Config> for ExecutorProcessConfig {
             job_data_clean_up_interval_seconds: 
opt.job_data_clean_up_interval_seconds,
             grpc_max_decoding_message_size: 
opt.grpc_server_max_decoding_message_size,
             grpc_max_encoding_message_size: 
opt.grpc_server_max_encoding_message_size,
+            grpc_server_config: 
ballista_core::utils::GrpcServerConfig::default(),
             executor_heartbeat_interval_seconds: 
opt.executor_heartbeat_interval_seconds,
             override_execution_engine: None,
             override_function_registry: None,
diff --git a/ballista/executor/src/executor_process.rs 
b/ballista/executor/src/executor_process.rs
index d365f3cb..4756d89b 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -42,6 +42,7 @@ use datafusion::execution::runtime_env::RuntimeEnvBuilder;
 
 use ballista_core::config::{LogRotationPolicy, TaskSchedulingPolicy};
 use ballista_core::error::BallistaError;
+use ballista_core::extension::SessionConfigExt;
 use ballista_core::serde::protobuf::executor_resource::Resource;
 use ballista_core::serde::protobuf::executor_status::Status;
 use ballista_core::serde::protobuf::{
@@ -53,7 +54,7 @@ use ballista_core::serde::{
 };
 use ballista_core::utils::{
     create_grpc_client_connection, create_grpc_server, default_config_producer,
-    get_time_before,
+    get_time_before, GrpcServerConfig,
 };
 use ballista_core::{ConfigProducer, RuntimeProducer, BALLISTA_VERSION};
 
@@ -88,6 +89,8 @@ pub struct ExecutorProcessConfig {
     pub grpc_max_decoding_message_size: u32,
     /// The maximum size of an encoded message
     pub grpc_max_encoding_message_size: u32,
+    /// gRPC server timeout configuration
+    pub grpc_server_config: GrpcServerConfig,
     pub executor_heartbeat_interval_seconds: u64,
     /// Optional execution engine to use to execute physical plans, will 
default to
     /// DataFusion if none is provided.
@@ -139,6 +142,7 @@ impl Default for ExecutorProcessConfig {
             job_data_clean_up_interval_seconds: 0,
             grpc_max_decoding_message_size: 16777216,
             grpc_max_encoding_message_size: 16777216,
+            grpc_server_config: Default::default(),
             executor_heartbeat_interval_seconds: 60,
             override_execution_engine: None,
             override_function_registry: None,
@@ -245,8 +249,10 @@ pub async fn start_executor_process(
     ));
 
     let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;
+    let session_config = (executor.config_producer)();
+    let ballista_config = session_config.ballista_config();
     let connection = if connect_timeout == 0 {
-        create_grpc_client_connection(scheduler_url)
+        create_grpc_client_connection(scheduler_url, 
&(&ballista_config).into())
             .await
             .map_err(|_| {
                 BallistaError::GrpcConnectionError(
@@ -262,13 +268,16 @@ pub async fn start_executor_process(
         while x.is_none()
             && Instant::now().elapsed().as_secs() - start_time < 
connect_timeout
         {
-            match create_grpc_client_connection(scheduler_url.clone())
-                .await
-                .map_err(|_| {
-                    BallistaError::GrpcConnectionError(
-                        "Could not connect to scheduler".to_string(),
-                    )
-                }) {
+            match create_grpc_client_connection(
+                scheduler_url.clone(),
+                &(&ballista_config).into(),
+            )
+            .await
+            .map_err(|_| {
+                BallistaError::GrpcConnectionError(
+                    "Could not connect to scheduler".to_string(),
+                )
+            }) {
                 Ok(connection) => {
                     info!("Connected to scheduler at {scheduler_url}");
                     x = Some(connection);
@@ -371,12 +380,13 @@ pub async fn start_executor_process(
                 shutdown,
                 opt.grpc_max_encoding_message_size as usize,
                 opt.grpc_max_decoding_message_size as usize,
+                opt.grpc_server_config.clone(),
             )
             .await
         }
         Some(flight_provider) => {
             info!("Starting custom, user provided, arrow flight service");
-            (flight_provider)(address, shutdown)
+            (flight_provider)(address, shutdown, 
opt.grpc_server_config.clone())
         }
     });
 
@@ -480,11 +490,12 @@ async fn flight_server_task(
     mut grpc_shutdown: Shutdown,
     max_encoding_message_size: usize,
     max_decoding_message_size: usize,
+    grpc_server_config: GrpcServerConfig,
 ) -> JoinHandle<Result<(), BallistaError>> {
     tokio::spawn(async move {
         info!("Built-in arrow flight server listening on: {address:?} 
max_encoding_size: {max_encoding_message_size} max_decoding_size: 
{max_decoding_message_size}");
 
-        let server_future = create_grpc_server()
+        let server_future = create_grpc_server(&grpc_server_config)
             .add_service(
                 FlightServiceServer::new(BallistaFlightService::new())
                     .max_decoding_message_size(max_decoding_message_size)
@@ -722,13 +733,15 @@ mod tests {
     async fn test_arrow_flight_provider_ergonomics() {
         let config = crate::executor_process::ExecutorProcessConfig {
             override_arrow_flight_service: Some(std::sync::Arc::new(
-                move |address, mut grpc_shutdown| {
+                move |address, mut grpc_shutdown, ballista_config| {
                     tokio::spawn(async move {
                         log::info!(
                             "custom arrow flight server listening on: 
{address:?}"
                         );
 
-                        let server_future = 
ballista_core::utils::create_grpc_server()
+                        let server_future = 
ballista_core::utils::create_grpc_server(
+                            &ballista_config,
+                        )
                         .add_service(
                             
arrow_flight::flight_service_server::FlightServiceServer::new(
                                 
crate::flight_service::BallistaFlightService::new(),
diff --git a/ballista/executor/src/executor_server.rs 
b/ballista/executor/src/executor_server.rs
index 8ca90c3d..1b40650e 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -28,6 +28,7 @@ use tonic::transport::Channel;
 use tonic::{Request, Response, Status};
 
 use ballista_core::error::BallistaError;
+use ballista_core::extension::SessionConfigExt;
 use ballista_core::serde::protobuf::{
     executor_grpc_server::{ExecutorGrpc, ExecutorGrpcServer},
     executor_metric, executor_status,
@@ -104,6 +105,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static 
+ AsExecutionPlan>(
         let executor_meta = executor.metadata.clone();
         let addr = format!("{}:{}", config.bind_host, executor_meta.grpc_port);
         let addr = addr.parse().unwrap();
+        let grpc_server_config = config.grpc_server_config.clone();
 
         info!(
             "Ballista v{BALLISTA_VERSION} Rust Executor Grpc Server listening 
on {addr:?}"
@@ -114,7 +116,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static 
+ AsExecutionPlan>(
         let mut grpc_shutdown = shutdown_noti.subscribe_for_shutdown();
         tokio::spawn(async move {
             let shutdown_signal = grpc_shutdown.recv();
-            let grpc_server_future = create_grpc_server()
+            let grpc_server_future = create_grpc_server(&grpc_server_config)
                 .add_service(server)
                 .serve_with_shutdown(addr, shutdown_signal);
             grpc_server_future.await.map_err(|e| {
@@ -235,7 +237,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorServer<T,
             Ok(scheduler)
         } else {
             let scheduler_url = format!("http://{scheduler_id}";);
-            let connection = 
create_grpc_client_connection(scheduler_url).await?;
+            let session_config = (self.executor.config_producer)();
+            let ballista_config = session_config.ballista_config();
+            let connection =
+                create_grpc_client_connection(scheduler_url, 
&(&ballista_config).into())
+                    .await?;
             let scheduler = SchedulerGrpcClient::new(connection)
                 .max_encoding_message_size(self.grpc_max_encoding_message_size)
                 
.max_decoding_message_size(self.grpc_max_decoding_message_size);
diff --git a/ballista/executor/src/lib.rs b/ballista/executor/src/lib.rs
index 26365162..e9322592 100644
--- a/ballista/executor/src/lib.rs
+++ b/ballista/executor/src/lib.rs
@@ -48,15 +48,21 @@ use ballista_core::serde::protobuf::{
     TaskStatus,
 };
 use ballista_core::serde::scheduler::PartitionId;
+use ballista_core::utils::GrpcServerConfig;
 
 /// [ArrowFlightServerProvider] provides a function which creates a new Arrow 
Flight server.
 ///
 /// The function should take two arguments:
 /// [SocketAddr] - the address to bind the server to
 /// [Shutdown] - a shutdown signal to gracefully shutdown the server
+/// [GrpcServerConfig] - the gRPC server configuration for timeout settings
 /// Returns a [tokio::task::JoinHandle] which will be registered as service 
handler
 ///
-pub type ArrowFlightServerProvider = dyn Fn(SocketAddr, Shutdown) -> 
tokio::task::JoinHandle<Result<(), BallistaError>>
+pub type ArrowFlightServerProvider = dyn Fn(
+        SocketAddr,
+        Shutdown,
+        GrpcServerConfig,
+    ) -> tokio::task::JoinHandle<Result<(), BallistaError>>
     + Send
     + Sync;
 
diff --git a/ballista/executor/src/standalone.rs 
b/ballista/executor/src/standalone.rs
index f439ad30..d9fa3c51 100644
--- a/ballista/executor/src/standalone.rs
+++ b/ballista/executor/src/standalone.rs
@@ -20,7 +20,7 @@ use crate::{execution_loop, executor::Executor, 
flight_service::BallistaFlightSe
 use arrow_flight::flight_service_server::FlightServiceServer;
 use ballista_core::extension::SessionConfigExt;
 use ballista_core::registry::BallistaFunctionRegistry;
-use ballista_core::utils::default_config_producer;
+use ballista_core::utils::{default_config_producer, GrpcServerConfig};
 use ballista_core::{
     error::Result,
     serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient, 
ExecutorRegistration},
@@ -124,7 +124,7 @@ pub async fn new_standalone_executor_from_builder(
         .max_encoding_message_size(max_message_size);
 
     tokio::spawn(
-        create_grpc_server()
+        create_grpc_server(&GrpcServerConfig::default())
             .add_service(server)
             
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(
                 listener,
diff --git a/ballista/scheduler/src/standalone.rs 
b/ballista/scheduler/src/standalone.rs
index cacc4e28..c4e6295d 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -23,6 +23,7 @@ use ballista_core::extension::SessionConfigExt;
 use ballista_core::serde::BallistaCodec;
 use ballista_core::utils::{
     create_grpc_server, default_config_producer, default_session_builder,
+    GrpcServerConfig,
 };
 use ballista_core::ConfigProducer;
 use ballista_core::{
@@ -93,7 +94,7 @@ pub async fn new_standalone_scheduler_with_builder(
     let addr = listener.local_addr()?;
     info!("Ballista v{BALLISTA_VERSION} Rust Scheduler listening on {addr:?}");
     tokio::spawn(
-        create_grpc_server()
+        create_grpc_server(&GrpcServerConfig::default())
             .add_service(server)
             
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(
                 listener,
diff --git a/ballista/scheduler/src/state/executor_manager.rs 
b/ballista/scheduler/src/state/executor_manager.rs
index 9c515d8f..d5f7297b 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -27,13 +27,16 @@ use crate::config::SchedulerConfig;
 
 use crate::state::execution_graph::RunningTaskInfo;
 use crate::state::task_manager::JobInfoCache;
+use ballista_core::extension::SessionConfigExt;
 use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
 use ballista_core::serde::protobuf::{
     executor_status, CancelTasksParams, ExecutorHeartbeat, MultiTaskDefinition,
     RemoveJobDataParams, StopExecutorParams,
 };
 use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
-use ballista_core::utils::{create_grpc_client_connection, get_time_before};
+use ballista_core::utils::{
+    create_grpc_client_connection, get_time_before, GrpcClientConfig,
+};
 use dashmap::DashMap;
 use log::{debug, error, info, warn};
 use std::collections::{HashMap, HashSet};
@@ -48,6 +51,7 @@ pub struct ExecutorManager {
     config: Arc<SchedulerConfig>,
     clients: ExecutorClients,
     pending_cleanup_jobs: Arc<DashMap<String, HashSet<String>>>,
+    grpc_client_config: GrpcClientConfig,
 }
 
 impl ExecutorManager {
@@ -55,11 +59,20 @@ impl ExecutorManager {
         cluster_state: Arc<dyn ClusterState>,
         config: Arc<SchedulerConfig>,
     ) -> Self {
+        let grpc_client_config =
+            if let Some(config_producer) = &config.override_config_producer {
+                let session_config = config_producer();
+                let ballista_config = session_config.ballista_config();
+                GrpcClientConfig::from(&ballista_config)
+            } else {
+                GrpcClientConfig::default()
+            };
         Self {
             cluster_state,
             config,
             clients: Default::default(),
             pending_cleanup_jobs: Default::default(),
+            grpc_client_config,
         }
     }
 
@@ -117,7 +130,10 @@ impl ExecutorManager {
         let executor_manager = self.clone();
         tokio::spawn(async move {
             for (executor_id, infos) in tasks_to_cancel {
-                if let Ok(mut client) = 
executor_manager.get_client(&executor_id).await {
+                if let Ok(mut client) = executor_manager
+                    .get_client(&executor_id, 
&executor_manager.grpc_client_config)
+                    .await
+                {
                     if let Err(e) = client
                         .cancel_tasks(CancelTasksParams { task_infos: infos })
                         .await
@@ -174,7 +190,9 @@ impl ExecutorManager {
             let job_id_clone = job_id.to_owned();
 
             if self.config.is_push_staged_scheduling() {
-                if let Ok(mut client) = self.get_client(&executor).await {
+                if let Ok(mut client) =
+                    self.get_client(&executor, &self.grpc_client_config).await
+                {
                     tokio::spawn(async move {
                         if let Err(err) = client
                             .remove_job_data(RemoveJobDataParams {
@@ -272,7 +290,10 @@ impl ExecutorManager {
 
     pub async fn stop_executor(&self, executor_id: &str, stop_reason: String) {
         let executor_id = executor_id.to_string();
-        match self.get_client(&executor_id).await {
+        match self
+            .get_client(&executor_id, &self.grpc_client_config)
+            .await
+        {
             Ok(mut client) => {
                 tokio::task::spawn(async move {
                     match client
@@ -304,7 +325,9 @@ impl ExecutorManager {
         multi_tasks: Vec<MultiTaskDefinition>,
         scheduler_id: String,
     ) -> Result<()> {
-        let mut client = self.get_client(executor_id).await?;
+        let mut client = self
+            .get_client(executor_id, &self.grpc_client_config)
+            .await?;
         client
             .launch_multi_task(protobuf::LaunchMultiTaskParams {
                 multi_tasks,
@@ -416,7 +439,11 @@ impl ExecutorManager {
             .collect::<Vec<_>>()
     }
 
-    async fn get_client(&self, executor_id: &str) -> 
Result<ExecutorGrpcClient<Channel>> {
+    async fn get_client(
+        &self,
+        executor_id: &str,
+        grpc_client_config: &GrpcClientConfig,
+    ) -> Result<ExecutorGrpcClient<Channel>> {
         let client = self.clients.get(executor_id).map(|value| value.clone());
 
         if let Some(client) = client {
@@ -427,7 +454,8 @@ impl ExecutorManager {
                 "http://{}:{}";,
                 executor_metadata.host, executor_metadata.grpc_port
             );
-            let connection = 
create_grpc_client_connection(executor_url).await?;
+            let connection =
+                create_grpc_client_connection(executor_url, 
grpc_client_config).await?;
             let client = ExecutorGrpcClient::new(connection);
 
             {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to