This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 917741ac feat: make gRPC timeout configurations user-configurable
(#1337)
917741ac is described below
commit 917741ac76f747902adc17f826ae482589613cb0
Author: Yu-Chuan Hung <[email protected]>
AuthorDate: Sun Nov 2 04:55:59 2025 +0800
feat: make gRPC timeout configurations user-configurable (#1337)
* feat: make gRPC timeout configurations user-configurable
- Extends PR #115 by making gRPC timeout values configurable instead of
hard-coded.
* refactor: introduce GrpcClientConfig and GrpcServerConfig
- Add GrpcClientConfig and GrpcServerConfig structs
- Update create_grpc_client_connection/create_grpc_server signatures
- Move server configs to ExecutorProcessConfig
- Add standalone prefix to keepalive_timeout config
* Address PR review feedback for gRPC configuration
- Simplify config test to query only ballista.job.name
- Remove standalone-specific gRPC configurations
- Implement From trait for GrpcClientConfig conversion
- Add documentation to gRPC config structs
- Inline GrpcServerConfig creation in executor and scheduler
- Pass BallistaConfig to ExecutorManager for proper config propagation
* refactor: simplify GrpcClientConfig usage with Into trait
- Use Into trait conversion for GrpcClientConfig creation
- Fix clippy needless_borrow warning in executor_manager
* Cargo fmt
* fix: remove unused GrpcClientConfig imports
---
ballista/client/tests/context_checks.rs | 13 ++-
ballista/core/src/client.rs | 14 +--
ballista/core/src/config.rs | 47 ++++++++-
.../core/src/execution_plans/distributed_query.rs | 6 +-
ballista/core/src/utils.rs | 117 +++++++++++++++++++--
ballista/executor/src/config.rs | 1 +
ballista/executor/src/executor_process.rs | 39 ++++---
ballista/executor/src/executor_server.rs | 10 +-
ballista/executor/src/lib.rs | 8 +-
ballista/executor/src/standalone.rs | 4 +-
ballista/scheduler/src/standalone.rs | 3 +-
ballista/scheduler/src/state/executor_manager.rs | 42 ++++++--
12 files changed, 250 insertions(+), 54 deletions(-)
diff --git a/ballista/client/tests/context_checks.rs
b/ballista/client/tests/context_checks.rs
index ce834bf5..48895345 100644
--- a/ballista/client/tests/context_checks.rs
+++ b/ballista/client/tests/context_checks.rs
@@ -234,18 +234,17 @@ mod supported {
assert!(ballista_config_extension.is_some());
let result = ctx
- .sql("select name, value from information_schema.df_settings where
name like 'ballista.%' order by name limit 2")
+ .sql("select name, value from information_schema.df_settings where
name = 'ballista.job.name'")
.await?
.collect()
.await?;
let expected = [
- "+---------------------------------------+----------+",
- "| name | value |",
- "+---------------------------------------+----------+",
- "| ballista.grpc_client_max_message_size | 16777216 |",
- "| ballista.job.name | |",
- "+---------------------------------------+----------+",
+ "+-------------------+-------+",
+ "| name | value |",
+ "+-------------------+-------+",
+ "| ballista.job.name | |",
+ "+-------------------+-------+",
];
assert_batches_eq!(expected, &result);
diff --git a/ballista/core/src/client.rs b/ballista/core/src/client.rs
index 6775dce0..1b354346 100644
--- a/ballista/core/src/client.rs
+++ b/ballista/core/src/client.rs
@@ -45,7 +45,7 @@ use datafusion::error::DataFusionError;
use datafusion::error::Result;
use crate::serde::protobuf;
-use crate::utils::create_grpc_client_connection;
+use crate::utils::{create_grpc_client_connection, GrpcClientConfig};
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::{Stream, StreamExt};
use log::{debug, warn};
@@ -71,15 +71,15 @@ impl BallistaClient {
max_message_size: usize,
) -> BResult<Self> {
let addr = format!("http://{host}:{port}");
+ let grpc_config = GrpcClientConfig::default();
debug!("BallistaClient connecting to {addr}");
- let connection =
- create_grpc_client_connection(addr.clone())
- .await
- .map_err(|e| {
- BallistaError::GrpcConnectionError(format!(
+ let connection = create_grpc_client_connection(addr.clone(),
&grpc_config)
+ .await
+ .map_err(|e| {
+ BallistaError::GrpcConnectionError(format!(
"Error connecting to Ballista scheduler or executor at
{addr}: {e:?}"
))
- })?;
+ })?;
let flight_client = FlightServiceClient::new(connection)
.max_decoding_message_size(max_message_size)
.max_encoding_message_size(max_message_size);
diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs
index ff7766de..d862f582 100644
--- a/ballista/core/src/config.rs
+++ b/ballista/core/src/config.rs
@@ -39,6 +39,16 @@ pub const BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ: &str =
pub const BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT: &str =
"ballista.shuffle.remote_read_prefer_flight";
+// gRPC client timeout configurations
+pub const BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS: &str =
+ "ballista.grpc.client.connect_timeout_seconds";
+pub const BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS: &str =
+ "ballista.grpc.client.timeout_seconds";
+pub const BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS: &str =
+ "ballista.grpc.client.tcp_keepalive_seconds";
+pub const BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS: &str =
+ "ballista.grpc.client.http2_keepalive_interval_seconds";
+
pub type ParseResult<T> = result::Result<T, String>;
use std::sync::LazyLock;
@@ -48,8 +58,8 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>>
= LazyLock::new(||
"Sets the job name that will appear in the web user
interface for any submitted jobs".to_string(),
DataType::Utf8, None),
ConfigEntry::new(BALLISTA_STANDALONE_PARALLELISM.to_string(),
- "Standalone processing parallelism ".to_string(),
- DataType::UInt16,
Some(std::thread::available_parallelism().map(|v|
v.get()).unwrap_or(1).to_string())),
+ "Standalone processing parallelism ".to_string(),
+ DataType::UInt16,
Some(std::thread::available_parallelism().map(|v|
v.get()).unwrap_or(1).to_string())),
ConfigEntry::new(BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE.to_string(),
"Configuration for max message size in gRPC
clients".to_string(),
DataType::UInt64,
@@ -66,7 +76,22 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String,
ConfigEntry>> = LazyLock::new(||
"Forces the shuffle reader to use flight reader
instead of block reader for remote read. Block reader usually has better
performance and resource utilization".to_string(),
DataType::Boolean,
Some((false).to_string())),
-
+
ConfigEntry::new(BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS.to_string(),
+ "Connection timeout for gRPC client in
seconds".to_string(),
+ DataType::UInt64,
+ Some((20).to_string())),
+ ConfigEntry::new(BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS.to_string(),
+ "Request timeout for gRPC client in
seconds".to_string(),
+ DataType::UInt64,
+ Some((20).to_string())),
+
ConfigEntry::new(BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS.to_string(),
+ "TCP keep-alive interval for gRPC client in
seconds".to_string(),
+ DataType::UInt64,
+ Some((3600).to_string())),
+
ConfigEntry::new(BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS.to_string(),
+ "HTTP/2 keep-alive interval for gRPC client in
seconds".to_string(),
+ DataType::UInt64,
+ Some((300).to_string()))
];
entries
.into_iter()
@@ -188,6 +213,22 @@ impl BallistaConfig {
self.get_usize_setting(BALLISTA_SHUFFLE_READER_MAX_REQUESTS)
}
+ pub fn default_grpc_client_connect_timeout_seconds(&self) -> usize {
+ self.get_usize_setting(BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS)
+ }
+
+ pub fn default_grpc_client_timeout_seconds(&self) -> usize {
+ self.get_usize_setting(BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS)
+ }
+
+ pub fn default_grpc_client_tcp_keepalive_seconds(&self) -> usize {
+ self.get_usize_setting(BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS)
+ }
+
+ pub fn default_grpc_client_http2_keepalive_interval_seconds(&self) ->
usize {
+
self.get_usize_setting(BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS)
+ }
+
/// Forces the shuffle reader to always read partitions via the Arrow
Flight client,
/// even when partitions are local to the node.
///
diff --git a/ballista/core/src/execution_plans/distributed_query.rs
b/ballista/core/src/execution_plans/distributed_query.rs
index eab3c0d0..aae084ad 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -23,7 +23,7 @@ use crate::serde::protobuf::{
scheduler_grpc_client::SchedulerGrpcClient, ExecuteQueryParams,
GetJobStatusParams,
GetJobStatusResult, KeyValuePair, PartitionLocation,
};
-use crate::utils::create_grpc_client_connection;
+use crate::utils::{create_grpc_client_connection, GrpcClientConfig};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::record_batch::RecordBatch;
@@ -238,6 +238,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for
DistributedQueryExec<T> {
self.session_id.clone(),
query,
self.config.default_grpc_client_max_message_size(),
+ GrpcClientConfig::from(&self.config),
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
@@ -274,10 +275,11 @@ async fn execute_query(
session_id: String,
query: ExecuteQueryParams,
max_message_size: usize,
+ grpc_config: GrpcClientConfig,
) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
info!("Connecting to Ballista scheduler at {scheduler_url}");
// TODO reuse the scheduler to avoid connecting to the Ballista scheduler
again and again
- let connection = create_grpc_client_connection(scheduler_url)
+ let connection = create_grpc_client_connection(scheduler_url, &grpc_config)
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 1e113157..248dbbe0 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::config::BallistaConfig;
use crate::error::{BallistaError, Result};
use crate::extension::SessionConfigExt;
use crate::serde::scheduler::PartitionStats;
@@ -36,6 +37,95 @@ use std::{fs::File, pin::Pin};
use tonic::codegen::StdError;
use tonic::transport::{Channel, Error, Server};
+/// Configuration for gRPC client connections.
+///
+/// This struct holds timeout and keep-alive settings that are applied
+/// when establishing gRPC connections from executors to schedulers or
+/// between distributed components.
+///
+/// # Examples
+///
+/// ```
+/// use ballista_core::config::BallistaConfig;
+/// use ballista_core::utils::GrpcClientConfig;
+///
+/// let ballista_config = BallistaConfig::default();
+/// let grpc_config = GrpcClientConfig::from(&ballista_config);
+/// ```
+#[derive(Debug, Clone)]
+pub struct GrpcClientConfig {
+ /// Connection timeout in seconds
+ pub connect_timeout_seconds: u64,
+ /// Request timeout in seconds
+ pub timeout_seconds: u64,
+ /// TCP keep-alive interval in seconds
+ pub tcp_keepalive_seconds: u64,
+ /// HTTP/2 keep-alive ping interval in seconds
+ pub http2_keepalive_interval_seconds: u64,
+}
+
+impl From<&BallistaConfig> for GrpcClientConfig {
+ fn from(config: &BallistaConfig) -> Self {
+ Self {
+ connect_timeout_seconds:
config.default_grpc_client_connect_timeout_seconds()
+ as u64,
+ timeout_seconds: config.default_grpc_client_timeout_seconds() as
u64,
+ tcp_keepalive_seconds:
config.default_grpc_client_tcp_keepalive_seconds()
+ as u64,
+ http2_keepalive_interval_seconds: config
+ .default_grpc_client_http2_keepalive_interval_seconds()
+ as u64,
+ }
+ }
+}
+
+impl Default for GrpcClientConfig {
+ fn default() -> Self {
+ Self {
+ connect_timeout_seconds: 20,
+ timeout_seconds: 20,
+ tcp_keepalive_seconds: 3600,
+ http2_keepalive_interval_seconds: 300,
+ }
+ }
+}
+
+/// Configuration for gRPC server.
+///
+/// This struct holds timeout and keep-alive settings that are applied
+/// when creating gRPC servers in executors and schedulers.
+///
+/// # Examples
+///
+/// ```
+/// use ballista_core::utils::GrpcServerConfig;
+///
+/// let server_config = GrpcServerConfig::default();
+/// let server = ballista_core::utils::create_grpc_server(&server_config);
+/// ```
+#[derive(Debug, Clone)]
+pub struct GrpcServerConfig {
+ /// Request timeout in seconds
+ pub timeout_seconds: u64,
+ /// TCP keep-alive interval in seconds
+ pub tcp_keepalive_seconds: u64,
+ /// HTTP/2 keep-alive ping interval in seconds
+ pub http2_keepalive_interval_seconds: u64,
+ /// HTTP/2 keep-alive ping timeout in seconds
+ pub http2_keepalive_timeout_seconds: u64,
+}
+
+impl Default for GrpcServerConfig {
+ fn default() -> Self {
+ Self {
+ timeout_seconds: 20,
+ tcp_keepalive_seconds: 3600,
+ http2_keepalive_interval_seconds: 300,
+ http2_keepalive_timeout_seconds: 20,
+ }
+ }
+}
+
/// Default session builder using the provided configuration
pub fn default_session_builder(
config: SessionConfig,
@@ -106,31 +196,40 @@ pub async fn collect_stream(
pub async fn create_grpc_client_connection<D>(
dst: D,
+ config: &GrpcClientConfig,
) -> std::result::Result<Channel, Error>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let endpoint = tonic::transport::Endpoint::new(dst)?
- .connect_timeout(Duration::from_secs(20))
- .timeout(Duration::from_secs(20))
+ .connect_timeout(Duration::from_secs(config.connect_timeout_seconds))
+ .timeout(Duration::from_secs(config.timeout_seconds))
// Disable Nagle's Algorithm since we don't want packets to wait
.tcp_nodelay(true)
- .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
- .http2_keep_alive_interval(Duration::from_secs(300))
+ .tcp_keepalive(Some(Duration::from_secs(config.tcp_keepalive_seconds)))
+ .http2_keep_alive_interval(Duration::from_secs(
+ config.http2_keepalive_interval_seconds,
+ ))
+ // Use a fixed timeout for keep-alive pings to keep configuration
simple
+ // since this is a standalone configuration
.keep_alive_timeout(Duration::from_secs(20))
.keep_alive_while_idle(true);
endpoint.connect().await
}
-pub fn create_grpc_server() -> Server {
+pub fn create_grpc_server(config: &GrpcServerConfig) -> Server {
Server::builder()
- .timeout(Duration::from_secs(20))
+ .timeout(Duration::from_secs(config.timeout_seconds))
// Disable Nagle's Algorithm since we don't want packets to wait
.tcp_nodelay(true)
- .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
- .http2_keepalive_interval(Option::Some(Duration::from_secs(300)))
- .http2_keepalive_timeout(Option::Some(Duration::from_secs(20)))
+ .tcp_keepalive(Some(Duration::from_secs(config.tcp_keepalive_seconds)))
+ .http2_keepalive_interval(Some(Duration::from_secs(
+ config.http2_keepalive_interval_seconds,
+ )))
+ .http2_keepalive_timeout(Some(Duration::from_secs(
+ config.http2_keepalive_timeout_seconds,
+ )))
}
pub fn collect_plan_metrics(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
diff --git a/ballista/executor/src/config.rs b/ballista/executor/src/config.rs
index 9fa2472b..7e486bd1 100644
--- a/ballista/executor/src/config.rs
+++ b/ballista/executor/src/config.rs
@@ -139,6 +139,7 @@ impl TryFrom<Config> for ExecutorProcessConfig {
job_data_clean_up_interval_seconds:
opt.job_data_clean_up_interval_seconds,
grpc_max_decoding_message_size:
opt.grpc_server_max_decoding_message_size,
grpc_max_encoding_message_size:
opt.grpc_server_max_encoding_message_size,
+ grpc_server_config:
ballista_core::utils::GrpcServerConfig::default(),
executor_heartbeat_interval_seconds:
opt.executor_heartbeat_interval_seconds,
override_execution_engine: None,
override_function_registry: None,
diff --git a/ballista/executor/src/executor_process.rs
b/ballista/executor/src/executor_process.rs
index d365f3cb..4756d89b 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -42,6 +42,7 @@ use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use ballista_core::config::{LogRotationPolicy, TaskSchedulingPolicy};
use ballista_core::error::BallistaError;
+use ballista_core::extension::SessionConfigExt;
use ballista_core::serde::protobuf::executor_resource::Resource;
use ballista_core::serde::protobuf::executor_status::Status;
use ballista_core::serde::protobuf::{
@@ -53,7 +54,7 @@ use ballista_core::serde::{
};
use ballista_core::utils::{
create_grpc_client_connection, create_grpc_server, default_config_producer,
- get_time_before,
+ get_time_before, GrpcServerConfig,
};
use ballista_core::{ConfigProducer, RuntimeProducer, BALLISTA_VERSION};
@@ -88,6 +89,8 @@ pub struct ExecutorProcessConfig {
pub grpc_max_decoding_message_size: u32,
/// The maximum size of an encoded message
pub grpc_max_encoding_message_size: u32,
+ /// gRPC server timeout configuration
+ pub grpc_server_config: GrpcServerConfig,
pub executor_heartbeat_interval_seconds: u64,
/// Optional execution engine to use to execute physical plans, will
default to
/// DataFusion if none is provided.
@@ -139,6 +142,7 @@ impl Default for ExecutorProcessConfig {
job_data_clean_up_interval_seconds: 0,
grpc_max_decoding_message_size: 16777216,
grpc_max_encoding_message_size: 16777216,
+ grpc_server_config: Default::default(),
executor_heartbeat_interval_seconds: 60,
override_execution_engine: None,
override_function_registry: None,
@@ -245,8 +249,10 @@ pub async fn start_executor_process(
));
let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;
+ let session_config = (executor.config_producer)();
+ let ballista_config = session_config.ballista_config();
let connection = if connect_timeout == 0 {
- create_grpc_client_connection(scheduler_url)
+ create_grpc_client_connection(scheduler_url,
&(&ballista_config).into())
.await
.map_err(|_| {
BallistaError::GrpcConnectionError(
@@ -262,13 +268,16 @@ pub async fn start_executor_process(
while x.is_none()
&& Instant::now().elapsed().as_secs() - start_time <
connect_timeout
{
- match create_grpc_client_connection(scheduler_url.clone())
- .await
- .map_err(|_| {
- BallistaError::GrpcConnectionError(
- "Could not connect to scheduler".to_string(),
- )
- }) {
+ match create_grpc_client_connection(
+ scheduler_url.clone(),
+ &(&ballista_config).into(),
+ )
+ .await
+ .map_err(|_| {
+ BallistaError::GrpcConnectionError(
+ "Could not connect to scheduler".to_string(),
+ )
+ }) {
Ok(connection) => {
info!("Connected to scheduler at {scheduler_url}");
x = Some(connection);
@@ -371,12 +380,13 @@ pub async fn start_executor_process(
shutdown,
opt.grpc_max_encoding_message_size as usize,
opt.grpc_max_decoding_message_size as usize,
+ opt.grpc_server_config.clone(),
)
.await
}
Some(flight_provider) => {
info!("Starting custom, user provided, arrow flight service");
- (flight_provider)(address, shutdown)
+ (flight_provider)(address, shutdown,
opt.grpc_server_config.clone())
}
});
@@ -480,11 +490,12 @@ async fn flight_server_task(
mut grpc_shutdown: Shutdown,
max_encoding_message_size: usize,
max_decoding_message_size: usize,
+ grpc_server_config: GrpcServerConfig,
) -> JoinHandle<Result<(), BallistaError>> {
tokio::spawn(async move {
info!("Built-in arrow flight server listening on: {address:?}
max_encoding_size: {max_encoding_message_size} max_decoding_size:
{max_decoding_message_size}");
- let server_future = create_grpc_server()
+ let server_future = create_grpc_server(&grpc_server_config)
.add_service(
FlightServiceServer::new(BallistaFlightService::new())
.max_decoding_message_size(max_decoding_message_size)
@@ -722,13 +733,15 @@ mod tests {
async fn test_arrow_flight_provider_ergonomics() {
let config = crate::executor_process::ExecutorProcessConfig {
override_arrow_flight_service: Some(std::sync::Arc::new(
- move |address, mut grpc_shutdown| {
+ move |address, mut grpc_shutdown, ballista_config| {
tokio::spawn(async move {
log::info!(
"custom arrow flight server listening on:
{address:?}"
);
- let server_future =
ballista_core::utils::create_grpc_server()
+ let server_future =
ballista_core::utils::create_grpc_server(
+ &ballista_config,
+ )
.add_service(
arrow_flight::flight_service_server::FlightServiceServer::new(
crate::flight_service::BallistaFlightService::new(),
diff --git a/ballista/executor/src/executor_server.rs
b/ballista/executor/src/executor_server.rs
index 8ca90c3d..1b40650e 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -28,6 +28,7 @@ use tonic::transport::Channel;
use tonic::{Request, Response, Status};
use ballista_core::error::BallistaError;
+use ballista_core::extension::SessionConfigExt;
use ballista_core::serde::protobuf::{
executor_grpc_server::{ExecutorGrpc, ExecutorGrpcServer},
executor_metric, executor_status,
@@ -104,6 +105,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static
+ AsExecutionPlan>(
let executor_meta = executor.metadata.clone();
let addr = format!("{}:{}", config.bind_host, executor_meta.grpc_port);
let addr = addr.parse().unwrap();
+ let grpc_server_config = config.grpc_server_config.clone();
info!(
"Ballista v{BALLISTA_VERSION} Rust Executor Grpc Server listening
on {addr:?}"
@@ -114,7 +116,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static
+ AsExecutionPlan>(
let mut grpc_shutdown = shutdown_noti.subscribe_for_shutdown();
tokio::spawn(async move {
let shutdown_signal = grpc_shutdown.recv();
- let grpc_server_future = create_grpc_server()
+ let grpc_server_future = create_grpc_server(&grpc_server_config)
.add_service(server)
.serve_with_shutdown(addr, shutdown_signal);
grpc_server_future.await.map_err(|e| {
@@ -235,7 +237,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorServer<T,
Ok(scheduler)
} else {
let scheduler_url = format!("http://{scheduler_id}");
- let connection =
create_grpc_client_connection(scheduler_url).await?;
+ let session_config = (self.executor.config_producer)();
+ let ballista_config = session_config.ballista_config();
+ let connection =
+ create_grpc_client_connection(scheduler_url,
&(&ballista_config).into())
+ .await?;
let scheduler = SchedulerGrpcClient::new(connection)
.max_encoding_message_size(self.grpc_max_encoding_message_size)
.max_decoding_message_size(self.grpc_max_decoding_message_size);
diff --git a/ballista/executor/src/lib.rs b/ballista/executor/src/lib.rs
index 26365162..e9322592 100644
--- a/ballista/executor/src/lib.rs
+++ b/ballista/executor/src/lib.rs
@@ -48,15 +48,21 @@ use ballista_core::serde::protobuf::{
TaskStatus,
};
use ballista_core::serde::scheduler::PartitionId;
+use ballista_core::utils::GrpcServerConfig;
/// [ArrowFlightServerProvider] provides a function which creates a new Arrow
Flight server.
///
/// The function should take two arguments:
/// [SocketAddr] - the address to bind the server to
/// [Shutdown] - a shutdown signal to gracefully shutdown the server
+/// [GrpcServerConfig] - the gRPC server configuration for timeout settings
/// Returns a [tokio::task::JoinHandle] which will be registered as service
handler
///
-pub type ArrowFlightServerProvider = dyn Fn(SocketAddr, Shutdown) ->
tokio::task::JoinHandle<Result<(), BallistaError>>
+pub type ArrowFlightServerProvider = dyn Fn(
+ SocketAddr,
+ Shutdown,
+ GrpcServerConfig,
+ ) -> tokio::task::JoinHandle<Result<(), BallistaError>>
+ Send
+ Sync;
diff --git a/ballista/executor/src/standalone.rs
b/ballista/executor/src/standalone.rs
index f439ad30..d9fa3c51 100644
--- a/ballista/executor/src/standalone.rs
+++ b/ballista/executor/src/standalone.rs
@@ -20,7 +20,7 @@ use crate::{execution_loop, executor::Executor,
flight_service::BallistaFlightSe
use arrow_flight::flight_service_server::FlightServiceServer;
use ballista_core::extension::SessionConfigExt;
use ballista_core::registry::BallistaFunctionRegistry;
-use ballista_core::utils::default_config_producer;
+use ballista_core::utils::{default_config_producer, GrpcServerConfig};
use ballista_core::{
error::Result,
serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient,
ExecutorRegistration},
@@ -124,7 +124,7 @@ pub async fn new_standalone_executor_from_builder(
.max_encoding_message_size(max_message_size);
tokio::spawn(
- create_grpc_server()
+ create_grpc_server(&GrpcServerConfig::default())
.add_service(server)
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(
listener,
diff --git a/ballista/scheduler/src/standalone.rs
b/ballista/scheduler/src/standalone.rs
index cacc4e28..c4e6295d 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -23,6 +23,7 @@ use ballista_core::extension::SessionConfigExt;
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::{
create_grpc_server, default_config_producer, default_session_builder,
+ GrpcServerConfig,
};
use ballista_core::ConfigProducer;
use ballista_core::{
@@ -93,7 +94,7 @@ pub async fn new_standalone_scheduler_with_builder(
let addr = listener.local_addr()?;
info!("Ballista v{BALLISTA_VERSION} Rust Scheduler listening on {addr:?}");
tokio::spawn(
- create_grpc_server()
+ create_grpc_server(&GrpcServerConfig::default())
.add_service(server)
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(
listener,
diff --git a/ballista/scheduler/src/state/executor_manager.rs
b/ballista/scheduler/src/state/executor_manager.rs
index 9c515d8f..d5f7297b 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -27,13 +27,16 @@ use crate::config::SchedulerConfig;
use crate::state::execution_graph::RunningTaskInfo;
use crate::state::task_manager::JobInfoCache;
+use ballista_core::extension::SessionConfigExt;
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
use ballista_core::serde::protobuf::{
executor_status, CancelTasksParams, ExecutorHeartbeat, MultiTaskDefinition,
RemoveJobDataParams, StopExecutorParams,
};
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
-use ballista_core::utils::{create_grpc_client_connection, get_time_before};
+use ballista_core::utils::{
+ create_grpc_client_connection, get_time_before, GrpcClientConfig,
+};
use dashmap::DashMap;
use log::{debug, error, info, warn};
use std::collections::{HashMap, HashSet};
@@ -48,6 +51,7 @@ pub struct ExecutorManager {
config: Arc<SchedulerConfig>,
clients: ExecutorClients,
pending_cleanup_jobs: Arc<DashMap<String, HashSet<String>>>,
+ grpc_client_config: GrpcClientConfig,
}
impl ExecutorManager {
@@ -55,11 +59,20 @@ impl ExecutorManager {
cluster_state: Arc<dyn ClusterState>,
config: Arc<SchedulerConfig>,
) -> Self {
+ let grpc_client_config =
+ if let Some(config_producer) = &config.override_config_producer {
+ let session_config = config_producer();
+ let ballista_config = session_config.ballista_config();
+ GrpcClientConfig::from(&ballista_config)
+ } else {
+ GrpcClientConfig::default()
+ };
Self {
cluster_state,
config,
clients: Default::default(),
pending_cleanup_jobs: Default::default(),
+ grpc_client_config,
}
}
@@ -117,7 +130,10 @@ impl ExecutorManager {
let executor_manager = self.clone();
tokio::spawn(async move {
for (executor_id, infos) in tasks_to_cancel {
- if let Ok(mut client) =
executor_manager.get_client(&executor_id).await {
+ if let Ok(mut client) = executor_manager
+ .get_client(&executor_id,
&executor_manager.grpc_client_config)
+ .await
+ {
if let Err(e) = client
.cancel_tasks(CancelTasksParams { task_infos: infos })
.await
@@ -174,7 +190,9 @@ impl ExecutorManager {
let job_id_clone = job_id.to_owned();
if self.config.is_push_staged_scheduling() {
- if let Ok(mut client) = self.get_client(&executor).await {
+ if let Ok(mut client) =
+ self.get_client(&executor, &self.grpc_client_config).await
+ {
tokio::spawn(async move {
if let Err(err) = client
.remove_job_data(RemoveJobDataParams {
@@ -272,7 +290,10 @@ impl ExecutorManager {
pub async fn stop_executor(&self, executor_id: &str, stop_reason: String) {
let executor_id = executor_id.to_string();
- match self.get_client(&executor_id).await {
+ match self
+ .get_client(&executor_id, &self.grpc_client_config)
+ .await
+ {
Ok(mut client) => {
tokio::task::spawn(async move {
match client
@@ -304,7 +325,9 @@ impl ExecutorManager {
multi_tasks: Vec<MultiTaskDefinition>,
scheduler_id: String,
) -> Result<()> {
- let mut client = self.get_client(executor_id).await?;
+ let mut client = self
+ .get_client(executor_id, &self.grpc_client_config)
+ .await?;
client
.launch_multi_task(protobuf::LaunchMultiTaskParams {
multi_tasks,
@@ -416,7 +439,11 @@ impl ExecutorManager {
.collect::<Vec<_>>()
}
- async fn get_client(&self, executor_id: &str) ->
Result<ExecutorGrpcClient<Channel>> {
+ async fn get_client(
+ &self,
+ executor_id: &str,
+ grpc_client_config: &GrpcClientConfig,
+ ) -> Result<ExecutorGrpcClient<Channel>> {
let client = self.clients.get(executor_id).map(|value| value.clone());
if let Some(client) = client {
@@ -427,7 +454,8 @@ impl ExecutorManager {
"http://{}:{}",
executor_metadata.host, executor_metadata.grpc_port
);
- let connection =
create_grpc_client_connection(executor_url).await?;
+ let connection =
+ create_grpc_client_connection(executor_url,
grpc_client_config).await?;
let client = ExecutorGrpcClient::new(connection);
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]