This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 379752e28 feat: Cluster RPC customisations to support TLS and custom
headers (#1400)
379752e28 is described below
commit 379752e284defb498d22522112ccf0badbc5607f
Author: Phillip LeBlanc <[email protected]>
AuthorDate: Thu Jan 29 18:17:39 2026 +0900
feat: Cluster RPC customisations to support TLS and custom headers (#1400)
* Cluster RPC customizations to support TLS and custom headers
* Add TLS support to scheduler flight proxy service
- Update BallistaFlightProxyService to accept use_tls and
customize_endpoint parameters
- Add use_tls field to SchedulerConfig with with_use_tls() builder method
- Unify EndpointOverrideFn type across crates to use
ballista_core::extension definition
- Update flight proxy to use https/http scheme based on TLS configuration
- Apply custom endpoint configuration for TLS certificate setup
---
Cargo.lock | 5 +
ballista/core/Cargo.toml | 2 +-
ballista/core/src/client.rs | 35 +-
.../core/src/execution_plans/distributed_query.rs | 53 +-
.../core/src/execution_plans/shuffle_reader.rs | 73 ++-
ballista/core/src/extension.rs | 234 +++++++++
ballista/core/src/utils.rs | 77 ++-
ballista/executor/src/config.rs | 1 +
ballista/executor/src/execution_loop.rs | 14 +-
ballista/executor/src/executor_process.rs | 74 ++-
ballista/executor/src/executor_server.rs | 26 +-
ballista/scheduler/src/config.rs | 25 +
ballista/scheduler/src/flight_proxy_service.rs | 40 +-
ballista/scheduler/src/scheduler_process.rs | 9 +
ballista/scheduler/src/state/executor_manager.rs | 20 +-
examples/Cargo.toml | 12 +
examples/examples/mtls-cluster.rs | 534 +++++++++++++++++++++
examples/examples/standalone-substrait.rs | 7 +-
18 files changed, 1166 insertions(+), 75 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 3fb69107f..1dbca79e0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1008,17 +1008,21 @@ dependencies = [
name = "ballista-examples"
version = "52.0.0"
dependencies = [
+ "arrow-flight",
"ballista",
"ballista-core",
"ballista-executor",
"ballista-scheduler",
"ctor",
"datafusion",
+ "datafusion-proto",
"datafusion-substrait",
"env_logger",
"futures",
"log",
"object_store",
+ "rustls",
+ "tempfile",
"testcontainers-modules",
"tokio",
"tonic",
@@ -5666,6 +5670,7 @@ dependencies = [
"socket2",
"sync_wrapper",
"tokio",
+ "tokio-rustls",
"tokio-stream",
"tower",
"tower-layer",
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 5ad3f7dce..22d89b59f 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -62,7 +62,7 @@ prost = { workspace = true }
prost-types = { workspace = true }
rand = { workspace = true }
serde = { workspace = true, features = ["derive"] }
-tokio = { workspace = true }
+tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-stream = { workspace = true, features = ["net"] }
tonic = { workspace = true }
tonic-prost = { workspace = true }
diff --git a/ballista/core/src/client.rs b/ballista/core/src/client.rs
index 53a9aedfa..c01d431ca 100644
--- a/ballista/core/src/client.rs
+++ b/ballista/core/src/client.rs
@@ -44,8 +44,11 @@ use datafusion::arrow::{
use datafusion::error::DataFusionError;
use datafusion::error::Result;
+use crate::extension::BallistaConfigGrpcEndpoint;
use crate::serde::protobuf;
-use crate::utils::{GrpcClientConfig, create_grpc_client_connection};
+
+use crate::utils::create_grpc_client_endpoint;
+
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::{Stream, StreamExt};
use log::{debug, warn};
@@ -69,17 +72,37 @@ impl BallistaClient {
host: &str,
port: u16,
max_message_size: usize,
+ use_tls: bool,
+ customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
) -> BResult<Self> {
- let addr = format!("http://{host}:{port}");
- let grpc_config = GrpcClientConfig::default();
+ let scheme = if use_tls { "https" } else { "http" };
+
+ let addr = format!("{scheme}://{host}:{port}");
debug!("BallistaClient connecting to {addr}");
- let connection = create_grpc_client_connection(addr.clone(),
&grpc_config)
- .await
+
+ let mut endpoint = create_grpc_client_endpoint(addr.clone(), None)
.map_err(|e| {
BallistaError::GrpcConnectionError(format!(
- "Error connecting to Ballista scheduler or executor at
{addr}: {e:?}"
+ "Error creating endpoint to Ballista scheduler or executor
at {addr}: {e:?}"
))
})?;
+
+ if let Some(customize) = customize_endpoint {
+ endpoint = customize
+ .configure_endpoint(endpoint)
+ .map_err(|e| {
+ BallistaError::GrpcConnectionError(format!(
+ "Error creating endpoint to Ballista scheduler or
executor at {addr}: {e:?}"
+ ))
+ })?;
+ }
+
+ let connection = endpoint.connect().await.map_err(|e| {
+ BallistaError::GrpcConnectionError(format!(
+ "Error connecting to Ballista scheduler or executor at {addr}:
{e:?}"
+ ))
+ })?;
+
let flight_client = FlightServiceClient::new(connection)
.max_decoding_message_size(max_message_size)
.max_encoding_message_size(max_message_size);
diff --git a/ballista/core/src/execution_plans/distributed_query.rs
b/ballista/core/src/execution_plans/distributed_query.rs
index e79edad1d..d7edac56b 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -17,6 +17,7 @@
use crate::client::BallistaClient;
use crate::config::BallistaConfig;
+use crate::extension::{BallistaConfigGrpcEndpoint, SessionConfigExt};
use crate::serde::protobuf::get_job_status_result::FlightProxy;
use crate::serde::protobuf::{
ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair,
@@ -24,7 +25,7 @@ use crate::serde::protobuf::{
scheduler_grpc_client::SchedulerGrpcClient,
};
use crate::serde::protobuf::{ExecutorMetadata, SuccessfulJob};
-use crate::utils::{GrpcClientConfig, create_grpc_client_connection};
+use crate::utils::{GrpcClientConfig, create_grpc_client_endpoint};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::record_batch::RecordBatch;
@@ -40,6 +41,7 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics,
};
+use datafusion::prelude::SessionConfig;
use datafusion_proto::logical_plan::{
AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
};
@@ -243,6 +245,8 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for
DistributedQueryExec<T> {
let metric_total_bytes =
MetricBuilder::new(&self.metrics).counter("transferred_bytes",
partition);
+ let session_config = context.session_config().clone();
+
let stream = futures::stream::once(
execute_query(
self.scheduler_url.clone(),
@@ -252,6 +256,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for
DistributedQueryExec<T> {
GrpcClientConfig::from(&self.config),
Arc::new(self.metrics.clone()),
partition,
+ session_config,
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
@@ -283,6 +288,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for
DistributedQueryExec<T> {
}
}
+#[allow(clippy::too_many_arguments)]
async fn execute_query(
scheduler_url: String,
session_id: String,
@@ -291,19 +297,39 @@ async fn execute_query(
grpc_config: GrpcClientConfig,
metrics: Arc<ExecutionPlanMetricsSet>,
partition: usize,
+ session_config: SessionConfig,
) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
+ let grpc_interceptor = session_config.ballista_grpc_interceptor();
+ let customize_endpoint =
+ session_config.ballista_override_create_grpc_client_endpoint();
+ let use_tls = session_config.ballista_use_tls();
+
// Capture query submission time for total_query_time_ms
let query_start_time = std::time::Instant::now();
info!("Connecting to Ballista scheduler at {scheduler_url}");
// TODO reuse the scheduler to avoid connecting to the Ballista scheduler
again and again
- let connection = create_grpc_client_connection(scheduler_url.clone(),
&grpc_config)
+ let mut endpoint =
+ create_grpc_client_endpoint(scheduler_url.clone(), Some(&grpc_config))
+ .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+
+ if let Some(ref customize) = customize_endpoint {
+ endpoint = customize
+ .configure_endpoint(endpoint)
+ .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+ }
+
+ let connection = endpoint
+ .connect()
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
- let mut scheduler = SchedulerGrpcClient::new(connection)
- .max_encoding_message_size(max_message_size)
- .max_decoding_message_size(max_message_size);
+ let mut scheduler = SchedulerGrpcClient::with_interceptor(
+ connection,
+ grpc_interceptor.as_ref().clone(),
+ )
+ .max_encoding_message_size(max_message_size)
+ .max_decoding_message_size(max_message_size);
let query_result = scheduler
.execute_query(query)
@@ -414,6 +440,8 @@ async fn execute_query(
true,
scheduler_url.clone(),
flight_proxy.clone(),
+ customize_endpoint.clone(),
+ use_tls,
)
.map_err(|e| ArrowError::ExternalError(Box::new(e)));
@@ -477,6 +505,8 @@ async fn fetch_partition(
flight_transport: bool,
scheduler_url: String,
flight_proxy: Option<FlightProxy>,
+ customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
+ use_tls: bool,
) -> Result<SendableRecordBatchStream> {
let metadata = location.executor_meta.ok_or_else(|| {
DataFusionError::Internal("Received empty executor
metadata".to_owned())
@@ -491,10 +521,15 @@ async fn fetch_partition(
let (client_host, client_port) =
get_client_host_port(&metadata, &scheduler_url, &flight_proxy)?;
- let mut ballista_client =
- BallistaClient::try_new(client_host.as_str(), client_port,
max_message_size)
- .await
- .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+ let mut ballista_client = BallistaClient::try_new(
+ client_host.as_str(),
+ client_port,
+ max_message_size,
+ use_tls,
+ customize_endpoint,
+ )
+ .await
+ .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
ballista_client
.fetch_partition(
&metadata.id,
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 7de252c9c..1b2de8613 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -33,7 +33,7 @@ use crate::client::BallistaClient;
use crate::execution_plans::sort_shuffle::{
get_index_path, is_sort_shuffle_output, stream_sort_shuffle_partition,
};
-use crate::extension::SessionConfigExt;
+use crate::extension::{BallistaConfigGrpcEndpoint, SessionConfigExt};
use crate::serde::scheduler::{PartitionLocation, PartitionStats};
use datafusion::arrow::datatypes::SchemaRef;
@@ -169,6 +169,8 @@ impl ExecutionPlan for ShuffleReaderExec {
let force_remote_read =
config.ballista_shuffle_reader_force_remote_read();
let prefer_flight =
config.ballista_shuffle_reader_remote_prefer_flight();
let batch_size = config.batch_size();
+ let customize_endpoint =
config.ballista_override_create_grpc_client_endpoint();
+ let use_tls = config.ballista_use_tls();
if force_remote_read {
debug!(
@@ -202,6 +204,8 @@ impl ExecutionPlan for ShuffleReaderExec {
max_message_size,
force_remote_read,
prefer_flight,
+ customize_endpoint,
+ use_tls,
);
let input_stream = Box::pin(RecordBatchStreamAdapter::new(
@@ -404,6 +408,8 @@ fn send_fetch_partitions(
max_message_size: usize,
force_remote_read: bool,
flight_transport: bool,
+ customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
+ use_tls: bool,
) -> AbortableReceiverStream {
let (response_sender, response_receiver) = mpsc::channel(max_request_num);
let semaphore = Arc::new(Semaphore::new(max_request_num));
@@ -420,10 +426,17 @@ fn send_fetch_partitions(
// keep local shuffle files reading in serial order for memory control.
let response_sender_c = response_sender.clone();
+ let customize_endpoint_c = customize_endpoint.clone();
spawned_tasks.push(SpawnedTask::spawn(async move {
for p in local_locations {
let r = PartitionReaderEnum::Local
- .fetch_partition(&p, max_message_size, flight_transport)
+ .fetch_partition(
+ &p,
+ max_message_size,
+ flight_transport,
+ customize_endpoint_c.clone(),
+ use_tls,
+ )
.await;
if let Err(e) = response_sender_c.send(r).await {
error!("Fail to send response event to the channel due to
{e}");
@@ -434,11 +447,18 @@ fn send_fetch_partitions(
for p in remote_locations.into_iter() {
let semaphore = semaphore.clone();
let response_sender = response_sender.clone();
+ let customize_endpoint_c = customize_endpoint.clone();
spawned_tasks.push(SpawnedTask::spawn(async move {
// Block if exceeds max request number.
let permit = semaphore.acquire_owned().await.unwrap();
let r = PartitionReaderEnum::FlightRemote
- .fetch_partition(&p, max_message_size, flight_transport)
+ .fetch_partition(
+ &p,
+ max_message_size,
+ flight_transport,
+ customize_endpoint_c,
+ use_tls,
+ )
.await;
// Block if the channel buffer is full.
if let Err(e) = response_sender.send(r).await {
@@ -465,6 +485,8 @@ trait PartitionReader: Send + Sync + Clone {
location: &PartitionLocation,
max_message_size: usize,
flight_transport: bool,
+ customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
+ use_tls: bool,
) -> result::Result<SendableRecordBatchStream, BallistaError>;
}
@@ -484,10 +506,19 @@ impl PartitionReader for PartitionReaderEnum {
location: &PartitionLocation,
max_message_size: usize,
flight_transport: bool,
+ customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
+ use_tls: bool,
) -> result::Result<SendableRecordBatchStream, BallistaError> {
match self {
PartitionReaderEnum::FlightRemote => {
- fetch_partition_remote(location, max_message_size,
flight_transport).await
+ fetch_partition_remote(
+ location,
+ max_message_size,
+ flight_transport,
+ customize_endpoint,
+ use_tls,
+ )
+ .await
}
PartitionReaderEnum::Local =>
fetch_partition_local(location).await,
PartitionReaderEnum::ObjectStoreRemote => {
@@ -501,6 +532,8 @@ async fn fetch_partition_remote(
location: &PartitionLocation,
max_message_size: usize,
flight_transport: bool,
+ customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
+ use_tls: bool,
) -> result::Result<SendableRecordBatchStream, BallistaError> {
let metadata = &location.executor_meta;
let partition_id = &location.partition_id;
@@ -508,18 +541,24 @@ async fn fetch_partition_remote(
// And we should also avoid to keep alive too many connections for long
time.
let host = metadata.host.as_str();
let port = metadata.port;
- let mut ballista_client = BallistaClient::try_new(host, port,
max_message_size)
- .await
- .map_err(|error| match error {
- // map grpc connection error to partition fetch error.
- BallistaError::GrpcConnectionError(msg) =>
BallistaError::FetchFailed(
- metadata.id.clone(),
- partition_id.stage_id,
- partition_id.partition_id,
- msg,
- ),
- other => other,
- })?;
+ let mut ballista_client = BallistaClient::try_new(
+ host,
+ port,
+ max_message_size,
+ use_tls,
+ customize_endpoint,
+ )
+ .await
+ .map_err(|error| match error {
+ // map grpc connection error to partition fetch error.
+ BallistaError::GrpcConnectionError(msg) => BallistaError::FetchFailed(
+ metadata.id.clone(),
+ partition_id.stage_id,
+ partition_id.partition_id,
+ msg,
+ ),
+ other => other,
+ })?;
ballista_client
.fetch_partition(
@@ -1087,6 +1126,8 @@ mod tests {
4 * 1024 * 1024,
false,
true,
+ None,
+ false,
);
let stream = RecordBatchStreamAdapter::new(
diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs
index 9021ce2f4..f7b02395d 100644
--- a/ballista/core/src/extension.rs
+++ b/ballista/core/src/extension.rs
@@ -30,7 +30,18 @@ use
datafusion::execution::session_state::SessionStateBuilder;
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_proto::protobuf::LogicalPlanNode;
+use std::collections::HashMap;
+use std::error::Error;
use std::sync::Arc;
+use tonic::codegen::http::HeaderName;
+use tonic::metadata::MetadataMap;
+use tonic::service::Interceptor;
+use tonic::transport::Endpoint;
+use tonic::{Request, Status};
+
+/// Type alias for the endpoint override function used in gRPC client
configuration
+pub type EndpointOverrideFn =
+ Arc<dyn Fn(Endpoint) -> Result<Endpoint, Box<dyn Error + Send + Sync>> +
Send + Sync>;
/// Provides methods which adapt [SessionState]
/// for Ballista usage
@@ -143,6 +154,29 @@ pub trait SessionConfigExt {
self,
prefer_flight: bool,
) -> Self;
+
+ /// Set user defined metadata keys in Ballista gRPC requests
+ fn with_ballista_grpc_metadata(self, metadata: HashMap<String, String>) ->
Self;
+
+ /// Get a `tonic` interceptor configured to decorate the provided metadata
keys
+ fn ballista_grpc_interceptor(&self) ->
Arc<BallistaGrpcMetadataInterceptor>;
+
+ /// Set a custom endpoint override function for gRPC client endpoint
configuration
+ fn with_ballista_override_create_grpc_client_endpoint(
+ self,
+ override_f: EndpointOverrideFn,
+ ) -> Self;
+
+ /// Get the custom endpoint override function for gRPC client endpoint
configuration
+ fn ballista_override_create_grpc_client_endpoint(
+ &self,
+ ) -> Option<Arc<BallistaConfigGrpcEndpoint>>;
+
+ /// Set whether to use TLS for executor connections (cluster-wide setting)
+ fn with_ballista_use_tls(self, use_tls: bool) -> Self;
+
+ /// Get whether to use TLS for executor connections
+ fn ballista_use_tls(&self) -> bool;
}
/// [SessionConfigHelperExt] is set of [SessionConfig] extension methods
@@ -389,6 +423,44 @@ impl SessionConfigExt for SessionConfig {
.set_bool(BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT,
prefer_flight)
}
}
+
+ fn with_ballista_grpc_metadata(self, metadata: HashMap<String, String>) ->
Self {
+ let extension = BallistaGrpcMetadataInterceptor::new(metadata);
+ self.with_extension(Arc::new(extension))
+ }
+
+ fn ballista_grpc_interceptor(&self) ->
Arc<BallistaGrpcMetadataInterceptor> {
+ self.get_extension::<BallistaGrpcMetadataInterceptor>()
+ .unwrap_or_default()
+ }
+
+ fn with_ballista_override_create_grpc_client_endpoint(
+ self,
+ override_f: Arc<
+ dyn Fn(Endpoint) -> Result<Endpoint, Box<dyn Error + Send + Sync>>
+ + Send
+ + Sync,
+ >,
+ ) -> Self {
+ let extension = BallistaConfigGrpcEndpoint::new(override_f);
+ self.with_extension(Arc::new(extension))
+ }
+
+ fn ballista_override_create_grpc_client_endpoint(
+ &self,
+ ) -> Option<Arc<BallistaConfigGrpcEndpoint>> {
+ self.get_extension::<BallistaConfigGrpcEndpoint>()
+ }
+
+ fn with_ballista_use_tls(self, use_tls: bool) -> Self {
+ self.with_extension(Arc::new(BallistaUseTls(use_tls)))
+ }
+
+ fn ballista_use_tls(&self) -> bool {
+ self.get_extension::<BallistaUseTls>()
+ .map(|ext| ext.0)
+ .unwrap_or(false)
+ }
}
impl SessionConfigHelperExt for SessionConfig {
@@ -528,6 +600,71 @@ impl BallistaQueryPlannerExtension {
}
}
+/// Wrapper allowing additional metadata keys to be decorated to the scheduler
+/// gRPC request
+#[derive(Default, Clone)]
+pub struct BallistaGrpcMetadataInterceptor {
+ additional_metadata: HashMap<String, String>,
+}
+
+impl BallistaGrpcMetadataInterceptor {
+ /// Create a new interceptor with additional metadata
+ pub fn new(additional_metadata: HashMap<String, String>) -> Self {
+ Self {
+ additional_metadata,
+ }
+ }
+}
+
+impl Interceptor for BallistaGrpcMetadataInterceptor {
+ fn call(&mut self, mut request: Request<()>) -> Result<Request<()>,
Status> {
+ if self.additional_metadata.is_empty() {
+ Ok(request)
+ } else {
+ let mut request_headers =
request.metadata().clone().into_headers();
+ for (k, v) in &self.additional_metadata {
+ request_headers.insert(
+ HeaderName::from_bytes(k.as_bytes())
+ .map_err(|e| Status::invalid_argument(e.to_string()))?,
+ v.parse().map_err(|_e| {
+ Status::invalid_argument(format!(
+ "{v} is not a valid header value"
+ ))
+ })?,
+ );
+ }
+ *request.metadata_mut() =
MetadataMap::from_headers(request_headers);
+ Ok(request)
+ }
+ }
+}
+
+/// Wrapper for customizing gRPC client endpoint configuration.
+/// This allows configuring TLS, timeouts, and other transport settings.
+#[derive(Clone)]
+pub struct BallistaConfigGrpcEndpoint {
+ override_f: EndpointOverrideFn,
+}
+
+impl BallistaConfigGrpcEndpoint {
+ /// Create a new endpoint configuration wrapper with the given override
function
+ pub fn new(override_f: EndpointOverrideFn) -> Self {
+ Self { override_f }
+ }
+
+ /// Apply the custom configuration to an endpoint
+ pub fn configure_endpoint(
+ &self,
+ endpoint: Endpoint,
+ ) -> Result<Endpoint, Box<dyn Error + Send + Sync>> {
+ (self.override_f)(endpoint)
+ }
+}
+
+/// Wrapper for cluster-wide TLS configuration
+#[derive(Clone, Copy)]
+pub struct BallistaUseTls(pub bool);
+
#[cfg(test)]
mod test {
use datafusion::{
@@ -572,4 +709,101 @@ mod test {
.any(|p| p.key == "datafusion.catalog.information_schema")
)
}
+
+ #[test]
+ fn test_ballista_grpc_metadata_interceptor() {
+ use std::collections::HashMap;
+ use tonic::Request;
+ use tonic::service::Interceptor;
+
+ use super::BallistaGrpcMetadataInterceptor;
+
+ // Test empty interceptor passes through unchanged
+ let mut interceptor = BallistaGrpcMetadataInterceptor::default();
+ let request = Request::new(());
+ let result = interceptor.call(request).unwrap();
+ assert!(result.metadata().is_empty());
+
+ // Test interceptor adds metadata
+ let mut metadata = HashMap::new();
+ metadata.insert("x-api-key".to_string(), "test-key".to_string());
+ metadata.insert("x-custom-header".to_string(),
"custom-value".to_string());
+
+ let mut interceptor = BallistaGrpcMetadataInterceptor::new(metadata);
+ let request = Request::new(());
+ let result = interceptor.call(request).unwrap();
+
+ assert_eq!(
+ result
+ .metadata()
+ .get("x-api-key")
+ .unwrap()
+ .to_str()
+ .unwrap(),
+ "test-key"
+ );
+ assert_eq!(
+ result
+ .metadata()
+ .get("x-custom-header")
+ .unwrap()
+ .to_str()
+ .unwrap(),
+ "custom-value"
+ );
+ }
+
+ #[test]
+ fn test_ballista_grpc_metadata_via_session_config() {
+ use std::collections::HashMap;
+ use tonic::Request;
+ use tonic::service::Interceptor;
+
+ // Test that metadata set via SessionConfig is accessible via
interceptor
+ let mut metadata = HashMap::new();
+ metadata.insert("authorization".to_string(), "Bearer
token123".to_string());
+
+ let config =
+
SessionConfig::new_with_ballista().with_ballista_grpc_metadata(metadata);
+
+ let interceptor = config.ballista_grpc_interceptor();
+ let mut interceptor = interceptor.as_ref().clone();
+
+ let request = Request::new(());
+ let result = interceptor.call(request).unwrap();
+
+ assert_eq!(
+ result
+ .metadata()
+ .get("authorization")
+ .unwrap()
+ .to_str()
+ .unwrap(),
+ "Bearer token123"
+ );
+ }
+
+ #[test]
+ fn test_ballista_endpoint_override_error_handling() {
+ use std::sync::Arc;
+ use tonic::transport::Endpoint;
+
+ use super::BallistaConfigGrpcEndpoint;
+
+ // Test that errors from override function are propagated
+ let override_fn: super::EndpointOverrideFn =
+ Arc::new(|_ep: Endpoint| Err("TLS configuration failed".into()));
+
+ let config_endpoint = BallistaConfigGrpcEndpoint::new(override_fn);
+ let endpoint = Endpoint::from_static("http://localhost:50051");
+ let result = config_endpoint.configure_endpoint(endpoint);
+
+ assert!(result.is_err());
+ assert!(
+ result
+ .unwrap_err()
+ .to_string()
+ .contains("TLS configuration failed")
+ );
+ }
}
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 75698233a..e7f4e2a57 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -36,7 +36,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{fs::File, pin::Pin};
use tonic::codegen::StdError;
-use tonic::transport::{Channel, Error, Server};
+use tonic::transport::{Channel, Endpoint, Error, Server};
/// Configuration for gRPC client connections.
///
@@ -222,6 +222,34 @@ where
endpoint.connect().await
}
+/// Creates a gRPC client endpoint (without connecting) for customization.
+/// This is typically used when TLS or other custom configuration is needed.
+/// If `config` is provided, standard timeout and keepalive settings are
applied.
+pub fn create_grpc_client_endpoint<D>(
+ dst: D,
+ config: Option<&GrpcClientConfig>,
+) -> std::result::Result<Endpoint, Error>
+where
+ D: std::convert::TryInto<tonic::transport::Endpoint>,
+ D::Error: Into<StdError>,
+{
+ let endpoint = tonic::transport::Endpoint::new(dst)?;
+ if let Some(config) = config {
+ Ok(endpoint
+
.connect_timeout(Duration::from_secs(config.connect_timeout_seconds))
+ .timeout(Duration::from_secs(config.timeout_seconds))
+ .tcp_nodelay(true)
+
.tcp_keepalive(Some(Duration::from_secs(config.tcp_keepalive_seconds)))
+ .http2_keep_alive_interval(Duration::from_secs(
+ config.http2_keepalive_interval_seconds,
+ ))
+ .keep_alive_timeout(Duration::from_secs(20))
+ .keep_alive_while_idle(true))
+ } else {
+ Ok(endpoint)
+ }
+}
+
/// Creates a gRPC server builder with the specified configuration.
pub fn create_grpc_server(config: &GrpcServerConfig) -> Server {
Server::builder()
@@ -261,3 +289,50 @@ pub fn get_time_before(interval_seconds: u64) -> u64 {
.unwrap_or_else(|| Duration::from_secs(0))
.as_secs()
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_grpc_client_config_from_ballista_config() {
+ let ballista_config = BallistaConfig::default();
+ let grpc_config = GrpcClientConfig::from(&ballista_config);
+
+ // Verify the conversion picks up the right values
+ assert_eq!(
+ grpc_config.connect_timeout_seconds,
+ ballista_config.default_grpc_client_connect_timeout_seconds() as
u64
+ );
+ assert_eq!(
+ grpc_config.timeout_seconds,
+ ballista_config.default_grpc_client_timeout_seconds() as u64
+ );
+ assert_eq!(
+ grpc_config.tcp_keepalive_seconds,
+ ballista_config.default_grpc_client_tcp_keepalive_seconds() as u64
+ );
+ assert_eq!(
+ grpc_config.http2_keepalive_interval_seconds,
+
ballista_config.default_grpc_client_http2_keepalive_interval_seconds() as u64
+ );
+ }
+
+ #[test]
+ fn test_create_grpc_client_endpoint_with_config() {
+ let config = GrpcClientConfig {
+ connect_timeout_seconds: 10,
+ timeout_seconds: 30,
+ tcp_keepalive_seconds: 1800,
+ http2_keepalive_interval_seconds: 150,
+ };
+ let result = create_grpc_client_endpoint("http://localhost:50051",
Some(&config));
+ assert!(result.is_ok());
+ }
+
+ #[test]
+ fn test_create_grpc_client_endpoint_invalid_url() {
+ let result = create_grpc_client_endpoint("not a valid url", None);
+ assert!(result.is_err());
+ }
+}
diff --git a/ballista/executor/src/config.rs b/ballista/executor/src/config.rs
index 524504c3e..2a58c9e92 100644
--- a/ballista/executor/src/config.rs
+++ b/ballista/executor/src/config.rs
@@ -176,6 +176,7 @@ impl TryFrom<Config> for ExecutorProcessConfig {
override_logical_codec: None,
override_physical_codec: None,
override_arrow_flight_service: None,
+ override_create_grpc_client_endpoint: None,
})
}
}
diff --git a/ballista/executor/src/execution_loop.rs
b/ballista/executor/src/execution_loop.rs
index 87e34a0ca..0d55793f2 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -46,7 +46,7 @@ use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{sync::Arc, time::Duration};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
-use tonic::transport::Channel;
+use tonic::codegen::{Body, Bytes, StdError};
/// Main execution loop that polls the scheduler for available tasks.
///
@@ -56,11 +56,17 @@ use tonic::transport::Channel;
///
/// The loop respects the executor's concurrent task limit via a semaphore,
/// ensuring no more than the configured number of tasks run simultaneously.
-pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
- mut scheduler: SchedulerGrpcClient<Channel>,
+pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan, C>(
+ mut scheduler: SchedulerGrpcClient<C>,
executor: Arc<Executor>,
codec: BallistaCodec<T, U>,
-) -> Result<(), BallistaError> {
+) -> Result<(), BallistaError>
+where
+ C: tonic::client::GrpcService<tonic::body::Body>,
+ C::Error: Into<StdError>,
+ C::ResponseBody: Body<Data = Bytes> + Send + 'static,
+ <C::ResponseBody as Body>::Error: Into<StdError> + Send,
+{
let executor_specification: ExecutorSpecification = executor
.metadata
.specification
diff --git a/ballista/executor/src/executor_process.rs
b/ballista/executor/src/executor_process.rs
index 16a5d2e45..702c8976b 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -42,7 +42,7 @@ use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use ballista_core::config::{LogRotationPolicy, TaskSchedulingPolicy};
use ballista_core::error::BallistaError;
-use ballista_core::extension::SessionConfigExt;
+use ballista_core::extension::{EndpointOverrideFn, SessionConfigExt};
use ballista_core::serde::protobuf::executor_resource::Resource;
use ballista_core::serde::protobuf::executor_status::Status;
use ballista_core::serde::protobuf::{
@@ -53,7 +53,7 @@ use ballista_core::serde::{
BallistaCodec, BallistaLogicalExtensionCodec,
BallistaPhysicalExtensionCodec,
};
use ballista_core::utils::{
- GrpcServerConfig, create_grpc_client_connection, create_grpc_server,
+ GrpcClientConfig, GrpcServerConfig, create_grpc_client_endpoint,
create_grpc_server,
default_config_producer, get_time_before,
};
use ballista_core::{BALLISTA_VERSION, ConfigProducer, RuntimeProducer};
@@ -129,6 +129,8 @@ pub struct ExecutorProcessConfig {
pub override_physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
/// [ArrowFlightServerProvider] implementation override option
pub override_arrow_flight_service: Option<Arc<ArrowFlightServerProvider>>,
+ /// Override function for customizing gRPC client endpoints before they
are used
+ pub override_create_grpc_client_endpoint: Option<EndpointOverrideFn>,
}
impl ExecutorProcessConfig {
@@ -174,6 +176,7 @@ impl Default for ExecutorProcessConfig {
override_logical_codec: None,
override_physical_codec: None,
override_arrow_flight_service: None,
+ override_create_grpc_client_endpoint: None,
}
}
}
@@ -284,14 +287,28 @@ pub async fn start_executor_process(
let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;
let session_config = (executor.config_producer)();
let ballista_config = session_config.ballista_config();
+ let grpc_config = GrpcClientConfig::from(&ballista_config);
let connection = if connect_timeout == 0 {
- create_grpc_client_connection(scheduler_url,
&(&ballista_config).into())
- .await
+ let mut endpoint = create_grpc_client_endpoint(scheduler_url,
Some(&grpc_config))
.map_err(|_| {
BallistaError::GrpcConnectionError(
- "Could not connect to scheduler".to_string(),
+ "Could not create endpoint to scheduler".to_string(),
)
- })
+ })?;
+
+ if let Some(ref override_fn) =
opt.override_create_grpc_client_endpoint {
+ endpoint = override_fn(endpoint).map_err(|_| {
+ BallistaError::GrpcConnectionError(
+ "Failed to apply endpoint override".to_string(),
+ )
+ })?;
+ }
+
+ endpoint.connect().await.map_err(|_| {
+ BallistaError::GrpcConnectionError(
+ "Could not connect to scheduler".to_string(),
+ )
+ })
} else {
// this feature was added to support docker-compose so that we can
have the executor
// wait for the scheduler to start, or at least run for 10 seconds
before failing so
@@ -301,23 +318,40 @@ pub async fn start_executor_process(
while x.is_none()
&& Instant::now().elapsed().as_secs() - start_time <
connect_timeout
{
- match create_grpc_client_connection(
- scheduler_url.clone(),
- &(&ballista_config).into(),
- )
- .await
- .map_err(|_| {
- BallistaError::GrpcConnectionError(
- "Could not connect to scheduler".to_string(),
- )
- }) {
- Ok(connection) => {
- info!("Connected to scheduler at {scheduler_url}");
- x = Some(connection);
+ match create_grpc_client_endpoint(scheduler_url.clone(),
Some(&grpc_config)) {
+ Ok(mut endpoint) => {
+ if let Some(ref override_fn) =
+ opt.override_create_grpc_client_endpoint
+ {
+ match override_fn(endpoint) {
+ Ok(overridden_endpoint) => endpoint =
overridden_endpoint,
+ Err(e) => {
+ warn!(
+ "Failed to apply endpoint override to
scheduler at {scheduler_url} ({e}); retrying ..."
+ );
+
tokio::time::sleep(time::Duration::from_millis(500))
+ .await;
+ continue;
+ }
+ }
+ }
+
+ match endpoint.connect().await {
+ Ok(connection) => {
+ info!("Connected to scheduler at {scheduler_url}");
+ x = Some(connection);
+ }
+ Err(e) => {
+ warn!(
+ "Failed to connect to scheduler at
{scheduler_url} ({e}); retrying ..."
+ );
+
tokio::time::sleep(time::Duration::from_millis(500)).await;
+ }
+ }
}
Err(e) => {
warn!(
- "Failed to connect to scheduler at {scheduler_url}
({e}); retrying ..."
+ "Failed to create endpoint to scheduler at
{scheduler_url} ({e}); retrying ..."
);
tokio::time::sleep(time::Duration::from_millis(500)).await;
}
diff --git a/ballista/executor/src/executor_server.rs
b/ballista/executor/src/executor_server.rs
index a3da987b8..c01f282ad 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -34,7 +34,7 @@ use tonic::transport::Channel;
use tonic::{Request, Response, Status};
use ballista_core::error::BallistaError;
-use ballista_core::extension::SessionConfigExt;
+use ballista_core::extension::EndpointOverrideFn;
use ballista_core::serde::BallistaCodec;
use ballista_core::serde::protobuf::{
CancelTasksParams, CancelTasksResult, ExecutorMetric, ExecutorStatus,
@@ -47,10 +47,12 @@ use ballista_core::serde::protobuf::{
};
use ballista_core::serde::scheduler::PartitionId;
use ballista_core::serde::scheduler::TaskDefinition;
+
use ballista_core::serde::scheduler::from_proto::{
get_task_definition, get_task_definition_vec,
};
-use ballista_core::utils::{create_grpc_client_connection, create_grpc_server};
+use ballista_core::utils::{create_grpc_client_endpoint, create_grpc_server};
+
use dashmap::DashMap;
use datafusion::execution::TaskContext;
use datafusion_proto::{logical_plan::AsLogicalPlan,
physical_plan::AsExecutionPlan};
@@ -113,6 +115,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static
+ AsExecutionPlan>(
codec,
config.grpc_max_encoding_message_size as usize,
config.grpc_max_decoding_message_size as usize,
+ config.override_create_grpc_client_endpoint.clone(),
);
// 1. Start executor grpc service
@@ -212,6 +215,7 @@ pub struct ExecutorServer<T: 'static + AsLogicalPlan, U:
'static + AsExecutionPl
grpc_max_encoding_message_size: usize,
/// Maximum size for incoming gRPC messages.
grpc_max_decoding_message_size: usize,
+ override_create_grpc_client_endpoint: Option<EndpointOverrideFn>,
}
#[derive(Clone)]
@@ -238,6 +242,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorServer<T,
codec: BallistaCodec<T, U>,
grpc_max_encoding_message_size: usize,
grpc_max_decoding_message_size: usize,
+ override_create_grpc_client_endpoint: Option<EndpointOverrideFn>,
) -> Self {
Self {
_start_time: SystemTime::now()
@@ -251,6 +256,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorServer<T,
schedulers: Default::default(),
grpc_max_encoding_message_size,
grpc_max_decoding_message_size,
+ override_create_grpc_client_endpoint,
}
}
@@ -264,11 +270,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorServer<T,
Ok(scheduler)
} else {
let scheduler_url = format!("http://{scheduler_id}");
- let session_config = (self.executor.config_producer)();
- let ballista_config = session_config.ballista_config();
- let connection =
- create_grpc_client_connection(scheduler_url,
&(&ballista_config).into())
- .await?;
+ let mut endpoint = create_grpc_client_endpoint(scheduler_url,
None)?;
+
+ if let Some(ref override_fn) =
self.override_create_grpc_client_endpoint {
+ endpoint = override_fn(endpoint).map_err(|e| {
+ BallistaError::GrpcConnectionError(format!(
+ "Failed to customize endpoint for scheduler
{scheduler_id}: {e}"
+ ))
+ })?;
+ }
+
+ let connection = endpoint.connect().await?;
let scheduler = SchedulerGrpcClient::new(connection)
.max_encoding_message_size(self.grpc_max_encoding_message_size)
.max_decoding_message_size(self.grpc_max_decoding_message_size);
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index bbb82f5f5..49d4b3f49 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -27,6 +27,7 @@
use crate::SessionBuilder;
use crate::cluster::DistributionPolicy;
+use ballista_core::extension::EndpointOverrideFn;
use ballista_core::{ConfigProducer, config::TaskSchedulingPolicy};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
@@ -240,6 +241,10 @@ pub struct SchedulerConfig {
pub override_logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
/// [PhysicalExtensionCodec] override option
pub override_physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
+ /// Override function for customizing gRPC client endpoints before they
are used
+ pub override_create_grpc_client_endpoint: Option<EndpointOverrideFn>,
+ /// Whether to use TLS when connecting to executors (for flight proxy)
+ pub use_tls: bool,
}
impl Default for SchedulerConfig {
@@ -266,6 +271,8 @@ impl Default for SchedulerConfig {
override_session_builder: None,
override_logical_codec: None,
override_physical_codec: None,
+ override_create_grpc_client_endpoint: None,
+ use_tls: false,
}
}
}
@@ -385,6 +392,22 @@ impl SchedulerConfig {
self.override_session_builder = Some(override_session_builder);
self
}
+
+ /// Set a custom override function for creating gRPC client endpoints.
+ /// This allows configuring TLS, timeouts, and other transport settings.
+ pub fn with_override_create_grpc_client_endpoint(
+ mut self,
+ override_fn: EndpointOverrideFn,
+ ) -> Self {
+ self.override_create_grpc_client_endpoint = Some(override_fn);
+ self
+ }
+
+ /// Sets whether TLS should be used when connecting to executors (for
flight proxy).
+ pub fn with_use_tls(mut self, use_tls: bool) -> Self {
+ self.use_tls = use_tls;
+ self
+ }
}
/// Policy of distributing tasks to available executor slots
@@ -495,6 +518,8 @@ impl TryFrom<Config> for SchedulerConfig {
override_logical_codec: None,
override_physical_codec: None,
override_session_builder: None,
+ override_create_grpc_client_endpoint: None,
+ use_tls: false,
};
Ok(config)
diff --git a/ballista/scheduler/src/flight_proxy_service.rs
b/ballista/scheduler/src/flight_proxy_service.rs
index bc5c63153..1513b3197 100644
--- a/ballista/scheduler/src/flight_proxy_service.rs
+++ b/ballista/scheduler/src/flight_proxy_service.rs
@@ -22,13 +22,15 @@ use arrow_flight::{
HandshakeRequest, HandshakeResponse, PollInfo, PutResult, SchemaResult,
Ticket,
};
use ballista_core::error::BallistaError;
+use ballista_core::extension::BallistaConfigGrpcEndpoint;
use ballista_core::serde::decode_protobuf;
use ballista_core::serde::scheduler::Action as BallistaAction;
-use ballista_core::utils::{GrpcClientConfig, create_grpc_client_connection};
+use ballista_core::utils::{GrpcClientConfig, create_grpc_client_endpoint};
use futures::{Stream, TryFutureExt};
use log::debug;
use std::pin::Pin;
+use std::sync::Arc;
use tonic::{Request, Response, Status, Streaming};
/// Service implementing a proxy from scheduler to executor Apache Arrow
Flight Protocol
@@ -40,16 +42,24 @@ use tonic::{Request, Response, Status, Streaming};
pub struct BallistaFlightProxyService {
max_decoding_message_size: usize,
max_encoding_message_size: usize,
+ /// Whether to use TLS when connecting to executors
+ use_tls: bool,
+ /// Optional function to customize gRPC endpoint configuration (e.g., for
TLS)
+ customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
}
impl BallistaFlightProxyService {
pub fn new(
max_decoding_message_size: usize,
max_encoding_message_size: usize,
+ use_tls: bool,
+ customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
) -> Self {
Self {
max_decoding_message_size,
max_encoding_message_size,
+ use_tls,
+ customize_endpoint,
}
}
}
@@ -120,6 +130,8 @@ impl FlightService for BallistaFlightProxyService {
*port,
self.max_decoding_message_size,
self.max_encoding_message_size,
+ self.use_tls,
+ self.customize_endpoint.clone(),
)
.map_err(|e| from_ballista_err(&e))
.await?;
@@ -169,16 +181,34 @@ async fn get_flight_client(
port: u16,
max_decoding_message_size: usize,
max_encoding_message_size: usize,
+ use_tls: bool,
+ customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
) -> Result<FlightServiceClient<tonic::transport::channel::Channel>,
BallistaError> {
- let addr = format!("http://{host}:{port}");
+ let scheme = if use_tls { "https" } else { "http" };
+ let addr = format!("{scheme}://{host}:{port}");
let grpc_config = GrpcClientConfig::default();
- let connection = create_grpc_client_connection(addr.clone(), &grpc_config)
- .await
+
+ let mut endpoint = create_grpc_client_endpoint(addr.clone(),
Some(&grpc_config))
.map_err(|e| {
BallistaError::GrpcConnectionError(format!(
- "Error connecting to Ballista scheduler or executor at {addr}:
{e:?}"
+ "Error creating endpoint for Ballista executor at {addr}:
{e:?}"
+ ))
+ })?;
+
+ if let Some(ref customize) = customize_endpoint {
+ endpoint = customize.configure_endpoint(endpoint).map_err(|e| {
+ BallistaError::GrpcConnectionError(format!(
+ "Error customizing endpoint for Ballista executor at {addr}:
{e}"
))
})?;
+ }
+
+ let connection = endpoint.connect().await.map_err(|e| {
+ BallistaError::GrpcConnectionError(format!(
+ "Error connecting to Ballista executor at {addr}: {e:?}"
+ ))
+ })?;
+
let flight_client = FlightServiceClient::new(connection)
.max_decoding_message_size(max_decoding_message_size)
.max_encoding_message_size(max_encoding_message_size);
diff --git a/ballista/scheduler/src/scheduler_process.rs
b/ballista/scheduler/src/scheduler_process.rs
index 679d2c411..3e6a97b2f 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -20,6 +20,7 @@ use crate::flight_proxy_service::BallistaFlightProxyService;
use arrow_flight::flight_service_server::FlightServiceServer;
use ballista_core::BALLISTA_VERSION;
use ballista_core::error::BallistaError;
+use ballista_core::extension::BallistaConfigGrpcEndpoint;
use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer;
use ballista_core::serde::{
BallistaCodec, BallistaLogicalExtensionCodec,
BallistaPhysicalExtensionCodec,
@@ -102,9 +103,17 @@ pub async fn start_grpc_service<
match &config.advertise_flight_sql_endpoint {
Some(proxy) if proxy.is_empty() => {
info!("Adding embedded flight proxy service on scheduler");
+ // Wrap the endpoint override function in
BallistaConfigGrpcEndpoint
+ let customize_endpoint = config
+ .override_create_grpc_client_endpoint
+ .clone()
+ .map(|f| Arc::new(BallistaConfigGrpcEndpoint::new(f)));
+
let flight_proxy =
FlightServiceServer::new(BallistaFlightProxyService::new(
config.grpc_server_max_encoding_message_size as usize,
config.grpc_server_max_decoding_message_size as usize,
+ config.use_tls,
+ customize_endpoint,
))
.max_decoding_message_size(
config.grpc_server_max_decoding_message_size as usize,
diff --git a/ballista/scheduler/src/state/executor_manager.rs
b/ballista/scheduler/src/state/executor_manager.rs
index 9ce158fab..3e36537a8 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -34,9 +34,11 @@ use ballista_core::serde::protobuf::{
StopExecutorParams, executor_status,
};
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
+
use ballista_core::utils::{
- GrpcClientConfig, create_grpc_client_connection, get_time_before,
+ GrpcClientConfig, create_grpc_client_endpoint, get_time_before,
};
+
use dashmap::DashMap;
use log::{debug, error, info, warn};
use std::collections::{HashMap, HashSet};
@@ -473,8 +475,20 @@ impl ExecutorManager {
"http://{}:{}",
executor_metadata.host, executor_metadata.grpc_port
);
- let connection =
- create_grpc_client_connection(executor_url,
grpc_client_config).await?;
+ let mut endpoint =
+ create_grpc_client_endpoint(executor_url,
Some(grpc_client_config))?;
+
+ if let Some(ref override_fn) =
+ self.config.override_create_grpc_client_endpoint
+ {
+ endpoint = override_fn(endpoint).map_err(|e| {
+ BallistaError::GrpcConnectionError(format!(
+ "Failed to customize endpoint for executor
{executor_id}: {e}"
+ ))
+ })?;
+ }
+
+ let connection = endpoint.connect().await?;
let client = ExecutorGrpcClient::new(connection);
{
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 255b28c79..b91bfc3a2 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -33,20 +33,30 @@ name = "standalone_sql"
path = "examples/standalone-sql.rs"
required-features = ["ballista/standalone"]
+[[example]]
+name = "mtls-cluster"
+path = "examples/mtls-cluster.rs"
+required-features = ["tls"]
+
[dependencies]
+# Optional dependency for TLS support
+rustls = { version = "0.23", features = ["ring"], optional = true }
[dev-dependencies]
+arrow-flight = { workspace = true }
ballista = { path = "../ballista/client", version = "52.0.0" }
ballista-core = { path = "../ballista/core", version = "52.0.0",
default-features = false }
ballista-executor = { path = "../ballista/executor", version = "52.0.0",
default-features = false }
ballista-scheduler = { path = "../ballista/scheduler", version = "52.0.0",
default-features = false }
ctor = { workspace = true }
datafusion = { workspace = true }
+datafusion-proto = { workspace = true }
datafusion-substrait = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
object_store = { workspace = true, features = ["aws"] }
+tempfile = { workspace = true }
testcontainers-modules = { version = "0.14", features = ["minio"] }
tokio = { workspace = true, features = [
"macros",
@@ -64,3 +74,5 @@ default = ["substrait", "standalone"]
standalone = ["ballista/standalone"]
substrait = ["ballista-scheduler/substrait"]
testcontainers = []
+# Enable TLS support for the mtls-cluster example
+tls = ["tonic/tls-ring", "dep:rustls"]
diff --git a/examples/examples/mtls-cluster.rs
b/examples/examples/mtls-cluster.rs
new file mode 100644
index 000000000..c86014bec
--- /dev/null
+++ b/examples/examples/mtls-cluster.rs
@@ -0,0 +1,534 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! # mTLS Cluster Configuration Example
+//!
+//! This example demonstrates how to configure mutual TLS (mTLS) for secure
+//! communication between Ballista scheduler, executors, and clients.
+//!
+//! ## Overview
+//!
+//! mTLS provides two-way authentication:
+//! - The client verifies the server's identity using the CA certificate
+//! - The server verifies the client's identity using client certificates
+//!
+//! ## Architecture
+//!
+//! This example uses pull-based scheduling where:
+//! - The scheduler runs a gRPC server with server-side TLS
+//! - Executors poll the scheduler for work (client TLS) and serve Flight data
(server TLS)
+//! - Clients connect to the scheduler with client TLS
+//!
+//! ## Running
+//!
+//! ```bash
+//! # Generate certificates
+//! cargo run --example mtls-cluster --features tls -- certs
+//!
+//! # Terminal 1: Start scheduler with TLS
+//! cargo run --example mtls-cluster --features tls -- scheduler
+//!
+//! # Terminal 2: Start executor with TLS
+//! cargo run --example mtls-cluster --features tls -- executor
+//!
+//! # Terminal 3: Run client query with TLS
+//! cargo run --example mtls-cluster --features tls -- client
+//! ```
+
+use std::net::SocketAddr;
+use std::process::Command;
+use std::sync::Arc;
+
+use arrow_flight::flight_service_server::FlightServiceServer;
+use ballista_core::ConfigProducer;
+use ballista_core::extension::{SessionConfigExt, SessionStateExt};
+use ballista_core::serde::protobuf::executor_resource::Resource;
+use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
+use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer;
+use ballista_core::serde::protobuf::{
+ ExecutorRegistration, ExecutorResource, ExecutorSpecification,
+};
+use ballista_core::serde::{
+ BallistaCodec, BallistaLogicalExtensionCodec,
BallistaPhysicalExtensionCodec,
+};
+use ballista_core::utils::create_grpc_client_endpoint;
+use ballista_executor::execution_loop;
+use ballista_executor::executor::Executor;
+use ballista_executor::flight_service::BallistaFlightService;
+use ballista_executor::metrics::LoggingMetricsCollector;
+use ballista_scheduler::cluster::BallistaCluster;
+use ballista_scheduler::config::SchedulerConfig;
+use ballista_scheduler::scheduler_server::SchedulerServer;
+use datafusion::execution::SessionStateBuilder;
+use datafusion::execution::runtime_env::RuntimeEnvBuilder;
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
+use log::info;
+use tonic::transport::{
+ Certificate, ClientTlsConfig, Endpoint, Identity, Server, ServerTlsConfig,
+};
+
+/// Directory for certificate files
+const CERTS_DIR: &str = "certs";
+const CA_CERT_PATH: &str = "certs/ca.crt";
+const CA_KEY_PATH: &str = "certs/ca.key";
+const SERVER_CERT_PATH: &str = "certs/server.crt";
+const SERVER_KEY_PATH: &str = "certs/server.key";
+const CLIENT_CERT_PATH: &str = "certs/client.crt";
+const CLIENT_KEY_PATH: &str = "certs/client.key";
+
+/// Holds loaded TLS certificates for mTLS configuration
+#[derive(Clone)]
+struct TlsConfig {
+ ca_cert: Certificate,
+ server_identity: Identity,
+ client_tls: ClientTlsConfig,
+}
+
+impl TlsConfig {
+ fn load() -> Result<Self, Box<dyn std::error::Error>> {
+ let ca_cert_pem = std::fs::read_to_string(CA_CERT_PATH)?;
+ let server_cert_pem = std::fs::read_to_string(SERVER_CERT_PATH)?;
+ let server_key_pem = std::fs::read_to_string(SERVER_KEY_PATH)?;
+ let client_cert_pem = std::fs::read_to_string(CLIENT_CERT_PATH)?;
+ let client_key_pem = std::fs::read_to_string(CLIENT_KEY_PATH)?;
+
+ let ca_cert = Certificate::from_pem(&ca_cert_pem);
+ let server_identity = Identity::from_pem(&server_cert_pem,
&server_key_pem);
+
+ let client_tls = ClientTlsConfig::new()
+ .ca_certificate(Certificate::from_pem(&ca_cert_pem))
+ .identity(Identity::from_pem(&client_cert_pem, &client_key_pem))
+ .domain_name("localhost");
+
+ Ok(Self {
+ ca_cert,
+ server_identity,
+ client_tls,
+ })
+ }
+
+ fn server_tls_config(&self) -> ServerTlsConfig {
+ ServerTlsConfig::new()
+ .identity(self.server_identity.clone())
+ .client_ca_root(self.ca_cert.clone())
+ }
+}
+
+/// Generate certificates for mTLS using OpenSSL
+fn generate_certs() -> Result<(), Box<dyn std::error::Error>> {
+ std::fs::create_dir_all(CERTS_DIR)?;
+ println!("Created {CERTS_DIR}/ directory");
+
+ let openssl_conf = r#"
+[req]
+distinguished_name = req_distinguished_name
+x509_extensions = v3_ca
+prompt = no
+
+[req_distinguished_name]
+CN = Ballista CA
+
+[v3_ca]
+basicConstraints = critical, CA:TRUE
+keyUsage = critical, keyCertSign, cRLSign
+subjectKeyIdentifier = hash
+
+[server_ext]
+basicConstraints = CA:FALSE
+keyUsage = critical, digitalSignature, keyEncipherment
+extendedKeyUsage = serverAuth, clientAuth
+subjectAltName = @alt_names
+subjectKeyIdentifier = hash
+authorityKeyIdentifier = keyid,issuer
+
+[client_ext]
+basicConstraints = CA:FALSE
+keyUsage = critical, digitalSignature, keyEncipherment
+extendedKeyUsage = clientAuth, serverAuth
+subjectAltName = @alt_names
+subjectKeyIdentifier = hash
+authorityKeyIdentifier = keyid,issuer
+
+[alt_names]
+DNS.1 = localhost
+IP.1 = 127.0.0.1
+"#;
+ let conf_path = format!("{CERTS_DIR}/openssl.cnf");
+ std::fs::write(&conf_path, openssl_conf)?;
+
+ println!("Generating CA...");
+ run_openssl(&["genrsa", "-out", CA_KEY_PATH, "4096"])?;
+ run_openssl(&[
+ "req",
+ "-new",
+ "-x509",
+ "-days",
+ "365",
+ "-key",
+ CA_KEY_PATH,
+ "-out",
+ CA_CERT_PATH,
+ "-config",
+ &conf_path,
+ "-extensions",
+ "v3_ca",
+ ])?;
+
+ println!("Generating server certificate...");
+ run_openssl(&["genrsa", "-out", SERVER_KEY_PATH, "4096"])?;
+ run_openssl(&[
+ "req",
+ "-new",
+ "-key",
+ SERVER_KEY_PATH,
+ "-out",
+ &format!("{CERTS_DIR}/server.csr"),
+ "-subj",
+ "/CN=localhost",
+ ])?;
+ run_openssl(&[
+ "x509",
+ "-req",
+ "-days",
+ "365",
+ "-in",
+ &format!("{CERTS_DIR}/server.csr"),
+ "-CA",
+ CA_CERT_PATH,
+ "-CAkey",
+ CA_KEY_PATH,
+ "-CAcreateserial",
+ "-out",
+ SERVER_CERT_PATH,
+ "-extfile",
+ &conf_path,
+ "-extensions",
+ "server_ext",
+ ])?;
+
+ println!("Generating client certificate...");
+ run_openssl(&["genrsa", "-out", CLIENT_KEY_PATH, "4096"])?;
+ run_openssl(&[
+ "req",
+ "-new",
+ "-key",
+ CLIENT_KEY_PATH,
+ "-out",
+ &format!("{CERTS_DIR}/client.csr"),
+ "-subj",
+ "/CN=ballista-client",
+ ])?;
+ run_openssl(&[
+ "x509",
+ "-req",
+ "-days",
+ "365",
+ "-in",
+ &format!("{CERTS_DIR}/client.csr"),
+ "-CA",
+ CA_CERT_PATH,
+ "-CAkey",
+ CA_KEY_PATH,
+ "-CAcreateserial",
+ "-out",
+ CLIENT_CERT_PATH,
+ "-extfile",
+ &conf_path,
+ "-extensions",
+ "client_ext",
+ ])?;
+
+ // Cleanup
+ for f in ["server.csr", "client.csr", "openssl.cnf", "ca.srl"] {
+ let _ = std::fs::remove_file(format!("{CERTS_DIR}/{f}"));
+ }
+
+ println!("\nCertificates generated in {CERTS_DIR}/");
+ println!(" - ca.crt / ca.key : Certificate Authority");
+ println!(" - server.crt / server.key : Server certificate (for
scheduler/executor)");
+ println!(" - client.crt / client.key : Client certificate (for
executors/clients)");
+ Ok(())
+}
+
+fn run_openssl(args: &[&str]) -> Result<(), Box<dyn std::error::Error>> {
+ let status = Command::new("openssl").args(args).status()?;
+ if !status.success() {
+ return Err(format!("openssl {} failed", args.join(" ")).into());
+ }
+ Ok(())
+}
+
+/// Create a ConfigProducer that configures TLS for task execution (shuffle
fetching)
+fn create_tls_config_producer(client_tls: ClientTlsConfig) -> ConfigProducer {
+ Arc::new(move || {
+ let tls = client_tls.clone();
+ SessionConfig::new_with_ballista()
+ .with_ballista_use_tls(true)
+ .with_ballista_override_create_grpc_client_endpoint(Arc::new(
+ move |endpoint: Endpoint| {
+ endpoint.tls_config(tls.clone()).map_err(|e| {
+ Box::new(e) as Box<dyn std::error::Error + Send + Sync>
+ })
+ },
+ ))
+ })
+}
+
+/// Start an mTLS-enabled scheduler
+async fn run_scheduler() -> Result<(), Box<dyn std::error::Error>> {
+ let addr: SocketAddr = "0.0.0.0:50050".parse()?;
+ info!("Starting mTLS scheduler on {addr}");
+
+ let tls = TlsConfig::load()?;
+
+ // Configure scheduler with TLS for outbound connections to executors
+ let client_tls = tls.client_tls.clone();
+ let endpoint_override: ballista_scheduler::config::EndpointOverrideFn =
+ Arc::new(move |endpoint: Endpoint|
endpoint.tls_config(client_tls.clone()));
+
+ let config = SchedulerConfig {
+ bind_host: "0.0.0.0".to_string(),
+ external_host: "localhost".to_string(),
+ bind_port: 50050,
+ override_create_grpc_client_endpoint: Some(endpoint_override),
+ ..Default::default()
+ };
+
+ let cluster = BallistaCluster::new_from_config(&config).await?;
+
+ // Create scheduler server
+ let codec = BallistaCodec::new(
+ Arc::new(BallistaLogicalExtensionCodec::default()),
+ Arc::new(BallistaPhysicalExtensionCodec::default()),
+ );
+ let metrics_collector =
ballista_scheduler::metrics::default_metrics_collector()?;
+
+ let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
+ SchedulerServer::new(
+ config.scheduler_name(),
+ cluster,
+ codec,
+ Arc::new(config),
+ metrics_collector,
+ );
+ scheduler.init().await?;
+
+ // Build gRPC server with mTLS
+ let scheduler_grpc = SchedulerGrpcServer::new(scheduler)
+ .max_decoding_message_size(16 * 1024 * 1024)
+ .max_encoding_message_size(16 * 1024 * 1024);
+
+ info!("Scheduler listening with mTLS on {addr}");
+
+ Server::builder()
+ .tls_config(tls.server_tls_config())?
+ .add_service(scheduler_grpc)
+ .serve(addr)
+ .await?;
+
+ Ok(())
+}
+
+/// Start an mTLS-enabled executor using pull-based scheduling
+async fn run_executor() -> Result<(), Box<dyn std::error::Error>> {
+ let flight_addr: SocketAddr = "0.0.0.0:50051".parse()?;
+ info!("Starting mTLS executor, flight service on {flight_addr}");
+
+ let tls = TlsConfig::load()?;
+
+ // Create executor
+ let executor_id = uuid::Uuid::new_v4().to_string();
+ let work_dir = tempfile::tempdir()?;
+ let work_dir_str = work_dir.path().to_string_lossy().to_string();
+
+ let executor_meta = ExecutorRegistration {
+ id: executor_id.clone(),
+ host: Some("localhost".to_string()),
+ port: 50051,
+ grpc_port: 0, // Not used in pull-based scheduling
+ specification: Some(ExecutorSpecification {
+ resources: vec![ExecutorResource {
+ resource: Some(Resource::TaskSlots(4)),
+ }],
+ }),
+ };
+
+ let config_producer = create_tls_config_producer(tls.client_tls.clone());
+
+ // Create runtime producer
+ let wd = work_dir_str.clone();
+ let runtime_producer: ballista_core::RuntimeProducer = Arc::new(move |_| {
+ Ok(Arc::new(
+ RuntimeEnvBuilder::new()
+ .with_temp_file_path(wd.clone())
+ .build()?,
+ ))
+ });
+
+ let executor = Arc::new(Executor::new(
+ executor_meta,
+ &work_dir_str,
+ runtime_producer,
+ config_producer,
+ Default::default(), // function_registry
+ Arc::new(LoggingMetricsCollector::default()), // metrics_collector
+ 4, // concurrent_tasks
+ None, // execution_engine
+ ));
+
+ // Start Flight service with mTLS for serving shuffle data
+ let flight_service = FlightServiceServer::new(BallistaFlightService::new())
+ .max_decoding_message_size(16 * 1024 * 1024)
+ .max_encoding_message_size(16 * 1024 * 1024);
+
+ // Spawn Flight server with TLS
+ let server_tls = tls.server_tls_config();
+ let flight_handle = tokio::spawn(async move {
+ info!("Executor Flight service listening with mTLS on {flight_addr}");
+ Server::builder()
+ .tls_config(server_tls)
+ .expect("Failed to configure TLS")
+ .add_service(flight_service)
+ .serve(flight_addr)
+ .await
+ });
+
+ // Connect to scheduler with TLS
+ let scheduler_url = "https://localhost:50050";
+ info!("Connecting to scheduler at {scheduler_url}");
+
+ let endpoint = create_grpc_client_endpoint(scheduler_url.to_string(),
None)?
+ .tls_config(tls.client_tls.clone())?;
+ let connection = endpoint.connect().await?;
+
+ let scheduler = SchedulerGrpcClient::new(connection)
+ .max_encoding_message_size(16 * 1024 * 1024)
+ .max_decoding_message_size(16 * 1024 * 1024);
+
+ let codec: BallistaCodec<LogicalPlanNode, PhysicalPlanNode> =
BallistaCodec::new(
+ Arc::new(BallistaLogicalExtensionCodec::default()),
+ Arc::new(BallistaPhysicalExtensionCodec::default()),
+ );
+
+ // Run the pull-based execution loop
+ // This registers the executor and starts polling for tasks
+ info!("Starting execution poll loop...");
+ let poll_handle = tokio::spawn(async move {
+ execution_loop::poll_loop(scheduler, executor, codec).await
+ });
+
+ tokio::select! {
+ result = flight_handle => {
+ result??;
+ }
+ result = poll_handle => {
+ result??;
+ }
+ }
+
+ Ok(())
+}
+
+/// Run a client query against the mTLS-enabled cluster
+async fn run_client() -> Result<(), Box<dyn std::error::Error>> {
+ info!("Connecting to mTLS scheduler at https://localhost:50050");
+
+ let tls = TlsConfig::load()?;
+
+ // Configure session with mTLS
+ let client_tls = tls.client_tls.clone();
+ let session_config = SessionConfig::new_with_ballista()
+ .with_ballista_use_tls(true)
+ .with_ballista_override_create_grpc_client_endpoint(Arc::new(
+ move |endpoint: Endpoint| {
+ endpoint
+ .tls_config(client_tls.clone())
+ .map_err(|e| Box::new(e) as Box<dyn std::error::Error +
Send + Sync>)
+ },
+ ));
+
+ let session_state = SessionStateBuilder::new()
+ .with_default_features()
+ .with_config(session_config)
+ .build()
+ .upgrade_for_ballista("https://localhost:50050".to_string())?;
+
+ let ctx = SessionContext::new_with_state(session_state);
+
+ info!("Executing query...");
+ let df = ctx.sql("SELECT 1 + 1 as result").await?;
+ df.show().await?;
+
+ Ok(())
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ // Install the ring crypto provider for rustls
+ // This is required when using tonic with TLS
+ rustls::crypto::ring::default_provider()
+ .install_default()
+ .expect("Failed to install rustls crypto provider");
+
+
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
+ .init();
+
+ let args: Vec<String> = std::env::args().collect();
+ let mode = args.get(1).map(|s| s.as_str()).unwrap_or("help");
+
+ match mode {
+ "certs" => generate_certs()?,
+ "scheduler" => run_scheduler().await?,
+ "executor" => run_executor().await?,
+ "client" => run_client().await?,
+ _ => {
+ println!(
+ r#"Usage: {} <certs|scheduler|executor|client>
+
+mTLS Cluster Example
+====================
+
+This example demonstrates how to configure mutual TLS (mTLS) for secure
+communication between Ballista scheduler, executors, and clients.
+
+Commands:
+ certs Generate TLS certificates (requires openssl)
+ scheduler Start mTLS-enabled scheduler on port 50050
+ executor Start mTLS-enabled executor on port 50051
+ client Run a test query against the cluster
+
+Quick start:
+ cargo run --example mtls-cluster --features tls -- certs
+ cargo run --example mtls-cluster --features tls -- scheduler # terminal 1
+ cargo run --example mtls-cluster --features tls -- executor # terminal 2
+ cargo run --example mtls-cluster --features tls -- client # terminal 3
+
+How it works:
+ - Generates X.509 v3 certificates with proper extensions (SAN, keyUsage)
+ - Scheduler runs gRPC server with ServerTlsConfig for mTLS
+ - Executor uses pull-based scheduling (polls scheduler for tasks)
+ - Executor's Flight service uses ServerTlsConfig for shuffle data
+ - Client uses SessionConfigExt to configure TLS for all connections
+"#,
+ args.first().unwrap_or(&"mtls-cluster".to_string())
+ );
+ }
+ }
+
+ Ok(())
+}
diff --git a/examples/examples/standalone-substrait.rs
b/examples/examples/standalone-substrait.rs
index e0b917597..7e8b2036c 100644
--- a/examples/examples/standalone-substrait.rs
+++ b/examples/examples/standalone-substrait.rs
@@ -406,9 +406,10 @@ impl SubstraitSchedulerClient {
let host = metadata.host.as_str();
let port = metadata.port as u16;
- let mut ballista_client = BallistaClient::try_new(host, port,
max_message_size)
- .await
- .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+ let mut ballista_client =
+ BallistaClient::try_new(host, port, max_message_size, false, None)
+ .await
+ .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
ballista_client
.fetch_partition(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]