martin-g commented on code in PR #1685:
URL:
https://github.com/apache/datafusion-ballista/pull/1685#discussion_r3221631066
##########
ballista/executor/src/standalone.rs:
##########
@@ -146,8 +259,108 @@ pub async fn new_standalone_executor_from_builder(
Ok(())
}
-/// Creates standalone executor with most values
-/// set as default.
+async fn push_staged_standalone_executor(
+ scheduler: SchedulerGrpcClient<Channel>,
+ concurrent_tasks: usize,
+ config_producer: ConfigProducer,
+ runtime_producer: RuntimeProducer,
+ codec: BallistaCodec,
+ function_registry: BallistaFunctionRegistry,
+) -> Result<()> {
+ let flight_listener = TcpListener::bind("localhost:0").await?;
+ let flight_addr = flight_listener.local_addr()?;
+ info!(
+ "Ballista v{BALLISTA_VERSION} Rust Executor (push) listening on
{flight_addr:?}"
+ );
+
+ let grpc_probe = TcpListener::bind("127.0.0.1:0").await?;
+ let grpc_port = grpc_probe.local_addr()?.port();
+ drop(grpc_probe);
+
+ let executor_meta = ExecutorRegistration {
+ id: Uuid::new_v4().to_string(),
+ host: Some("localhost".to_string()),
+ port: flight_addr.port() as u32,
+ grpc_port: grpc_port as u32,
+ specification: Some(
+ ExecutorSpecification::default()
+ .with_task_slots(concurrent_tasks as u32)
+ .into(),
+ ),
+ os_info: Some(ExecutorOperatingSystemSpecification::default().into()),
+ };
+
+ let config_snap = config_producer();
+ let max_message_sz = config_snap.ballista_grpc_client_max_message_size()
as u32;
+
+ let work_dir = TempDir::new()?.path().to_str().unwrap().to_string();
+ info!("work_dir: {work_dir}");
+
+ let executor = Arc::new(Executor::with_default_execution_engine(
+ executor_meta,
+ &work_dir,
+ runtime_producer,
+ config_producer.clone(),
+ Arc::new(function_registry),
+ Arc::new(LoggingMetricsCollector::default()),
+ concurrent_tasks,
+ ));
+
+ let service = BallistaFlightService::new(work_dir);
+ let server = FlightServiceServer::new(service)
+ .max_decoding_message_size(max_message_sz as usize)
+ .max_encoding_message_size(max_message_sz as usize);
+
+ tokio::spawn(
+ create_grpc_server(&GrpcServerConfig::default())
+ .add_service(server)
+
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(
+ flight_listener,
+ )),
+ );
+
+ let exec_cfg = ExecutorProcessConfig {
+ bind_host: "127.0.0.1".into(),
+ port: flight_addr.port(),
+ grpc_port,
+ concurrent_tasks,
+ task_scheduling_policy: TaskSchedulingPolicy::PushStaged,
+ grpc_max_decoding_message_size: max_message_sz,
+ grpc_max_encoding_message_size: max_message_sz,
+ ..ExecutorProcessConfig::default()
+ };
+
+ let shutdown_notifier: &'static ShutdownNotifier =
+ Box::leak(Box::new(ShutdownNotifier::new()));
+ let (stop_send, _stop_recv) = mpsc::channel::<bool>(10);
Review Comment:
What is the idea behind the immediate dropping of the Receiver ?
The Sender will fail when one tries to use it
##########
ballista/executor/src/standalone.rs:
##########
@@ -146,8 +259,108 @@ pub async fn new_standalone_executor_from_builder(
Ok(())
}
-/// Creates standalone executor with most values
-/// set as default.
+async fn push_staged_standalone_executor(
+ scheduler: SchedulerGrpcClient<Channel>,
+ concurrent_tasks: usize,
+ config_producer: ConfigProducer,
+ runtime_producer: RuntimeProducer,
+ codec: BallistaCodec,
+ function_registry: BallistaFunctionRegistry,
+) -> Result<()> {
+ let flight_listener = TcpListener::bind("localhost:0").await?;
+ let flight_addr = flight_listener.local_addr()?;
+ info!(
+ "Ballista v{BALLISTA_VERSION} Rust Executor (push) listening on
{flight_addr:?}"
+ );
+
+ let grpc_probe = TcpListener::bind("127.0.0.1:0").await?;
+ let grpc_port = grpc_probe.local_addr()?.port();
+ drop(grpc_probe);
+
+ let executor_meta = ExecutorRegistration {
+ id: Uuid::new_v4().to_string(),
+ host: Some("localhost".to_string()),
Review Comment:
Some times you use `127.0.0.1` and other times `localhost`. Why ?
I guess you replaced `localhost` with `127.0.0.1` to make it explicit that
IPv4 should be used. But the new usages of `localhost` confuse me.
##########
ballista/executor/src/standalone.rs:
##########
@@ -146,8 +259,108 @@ pub async fn new_standalone_executor_from_builder(
Ok(())
}
-/// Creates standalone executor with most values
-/// set as default.
+async fn push_staged_standalone_executor(
+ scheduler: SchedulerGrpcClient<Channel>,
+ concurrent_tasks: usize,
+ config_producer: ConfigProducer,
+ runtime_producer: RuntimeProducer,
+ codec: BallistaCodec,
+ function_registry: BallistaFunctionRegistry,
+) -> Result<()> {
+ let flight_listener = TcpListener::bind("localhost:0").await?;
+ let flight_addr = flight_listener.local_addr()?;
+ info!(
+ "Ballista v{BALLISTA_VERSION} Rust Executor (push) listening on
{flight_addr:?}"
+ );
+
+ let grpc_probe = TcpListener::bind("127.0.0.1:0").await?;
+ let grpc_port = grpc_probe.local_addr()?.port();
+ drop(grpc_probe);
Review Comment:
Probe and Drop is a subject to TOCTOU race condition.
Some other process may bind to the now available port before you try to do
it.
##########
ballista/executor/src/standalone.rs:
##########
@@ -146,8 +259,108 @@ pub async fn new_standalone_executor_from_builder(
Ok(())
}
-/// Creates standalone executor with most values
-/// set as default.
+async fn push_staged_standalone_executor(
+ scheduler: SchedulerGrpcClient<Channel>,
+ concurrent_tasks: usize,
+ config_producer: ConfigProducer,
+ runtime_producer: RuntimeProducer,
+ codec: BallistaCodec,
+ function_registry: BallistaFunctionRegistry,
+) -> Result<()> {
+ let flight_listener = TcpListener::bind("localhost:0").await?;
+ let flight_addr = flight_listener.local_addr()?;
+ info!(
+ "Ballista v{BALLISTA_VERSION} Rust Executor (push) listening on
{flight_addr:?}"
+ );
+
+ let grpc_probe = TcpListener::bind("127.0.0.1:0").await?;
+ let grpc_port = grpc_probe.local_addr()?.port();
+ drop(grpc_probe);
+
+ let executor_meta = ExecutorRegistration {
+ id: Uuid::new_v4().to_string(),
+ host: Some("localhost".to_string()),
+ port: flight_addr.port() as u32,
+ grpc_port: grpc_port as u32,
+ specification: Some(
+ ExecutorSpecification::default()
+ .with_task_slots(concurrent_tasks as u32)
+ .into(),
+ ),
+ os_info: Some(ExecutorOperatingSystemSpecification::default().into()),
+ };
+
+ let config_snap = config_producer();
+ let max_message_sz = config_snap.ballista_grpc_client_max_message_size()
as u32;
Review Comment:
```suggestion
let max_message_size =
config_snap.ballista_grpc_client_max_message_size() as u32;
```
`sz` is not very clear. Also it is not consistent with
[pull](https://github.com/apache/datafusion-ballista/pull/1685/changes#diff-2bdd8710486575e8337fd9ad57940750b92aa9566a88bd34d811e119f7386571R230)
##########
ballista/executor/src/executor_server.rs:
##########
@@ -148,6 +169,8 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static
+ AsExecutionPlan>(
})
};
+ wait_executor_grpc_listen(&config.bind_host, config.grpc_port).await?;
Review Comment:
You have `addr` here - a `std::net::SocketAddr`. No need to pass
`host`+`port` to parse them again in the body of `wait_executor_grpc_listen()`
##########
ballista/executor/src/standalone.rs:
##########
@@ -146,8 +259,108 @@ pub async fn new_standalone_executor_from_builder(
Ok(())
}
-/// Creates standalone executor with most values
-/// set as default.
+async fn push_staged_standalone_executor(
+ scheduler: SchedulerGrpcClient<Channel>,
+ concurrent_tasks: usize,
+ config_producer: ConfigProducer,
+ runtime_producer: RuntimeProducer,
+ codec: BallistaCodec,
+ function_registry: BallistaFunctionRegistry,
+) -> Result<()> {
+ let flight_listener = TcpListener::bind("localhost:0").await?;
+ let flight_addr = flight_listener.local_addr()?;
+ info!(
+ "Ballista v{BALLISTA_VERSION} Rust Executor (push) listening on
{flight_addr:?}"
+ );
+
+ let grpc_probe = TcpListener::bind("127.0.0.1:0").await?;
+ let grpc_port = grpc_probe.local_addr()?.port();
+ drop(grpc_probe);
+
+ let executor_meta = ExecutorRegistration {
+ id: Uuid::new_v4().to_string(),
+ host: Some("localhost".to_string()),
+ port: flight_addr.port() as u32,
+ grpc_port: grpc_port as u32,
+ specification: Some(
+ ExecutorSpecification::default()
+ .with_task_slots(concurrent_tasks as u32)
+ .into(),
+ ),
+ os_info: Some(ExecutorOperatingSystemSpecification::default().into()),
+ };
+
+ let config_snap = config_producer();
+ let max_message_sz = config_snap.ballista_grpc_client_max_message_size()
as u32;
Review Comment:
Why the cast to `u32` ?
##########
ballista/executor/src/executor_server.rs:
##########
@@ -148,6 +169,8 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static
+ AsExecutionPlan>(
})
};
+ wait_executor_grpc_listen(&config.bind_host, config.grpc_port).await?;
Review Comment:
Is `config.grpc_port` the same as `executor_meta.grpc_port` ?
https://github.com/apache/datafusion-ballista/pull/1685/changes#diff-219ce667b5c79a620f1b44a8ba862134f4e1f7cfee78c737deaeeb50be71ac14R149
##########
ballista/executor/src/standalone.rs:
##########
@@ -146,8 +259,108 @@ pub async fn new_standalone_executor_from_builder(
Ok(())
}
-/// Creates standalone executor with most values
-/// set as default.
+async fn push_staged_standalone_executor(
+ scheduler: SchedulerGrpcClient<Channel>,
+ concurrent_tasks: usize,
+ config_producer: ConfigProducer,
+ runtime_producer: RuntimeProducer,
+ codec: BallistaCodec,
+ function_registry: BallistaFunctionRegistry,
+) -> Result<()> {
+ let flight_listener = TcpListener::bind("localhost:0").await?;
+ let flight_addr = flight_listener.local_addr()?;
+ info!(
+ "Ballista v{BALLISTA_VERSION} Rust Executor (push) listening on
{flight_addr:?}"
+ );
+
+ let grpc_probe = TcpListener::bind("127.0.0.1:0").await?;
+ let grpc_port = grpc_probe.local_addr()?.port();
+ drop(grpc_probe);
+
+ let executor_meta = ExecutorRegistration {
+ id: Uuid::new_v4().to_string(),
+ host: Some("localhost".to_string()),
+ port: flight_addr.port() as u32,
+ grpc_port: grpc_port as u32,
+ specification: Some(
+ ExecutorSpecification::default()
+ .with_task_slots(concurrent_tasks as u32)
+ .into(),
+ ),
+ os_info: Some(ExecutorOperatingSystemSpecification::default().into()),
+ };
+
+ let config_snap = config_producer();
+ let max_message_sz = config_snap.ballista_grpc_client_max_message_size()
as u32;
+
+ let work_dir = TempDir::new()?.path().to_str().unwrap().to_string();
+ info!("work_dir: {work_dir}");
+
+ let executor = Arc::new(Executor::with_default_execution_engine(
+ executor_meta,
+ &work_dir,
+ runtime_producer,
+ config_producer.clone(),
+ Arc::new(function_registry),
+ Arc::new(LoggingMetricsCollector::default()),
+ concurrent_tasks,
+ ));
+
+ let service = BallistaFlightService::new(work_dir);
+ let server = FlightServiceServer::new(service)
+ .max_decoding_message_size(max_message_sz as usize)
+ .max_encoding_message_size(max_message_sz as usize);
+
+ tokio::spawn(
+ create_grpc_server(&GrpcServerConfig::default())
+ .add_service(server)
+
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(
+ flight_listener,
+ )),
+ );
+
+ let exec_cfg = ExecutorProcessConfig {
+ bind_host: "127.0.0.1".into(),
+ port: flight_addr.port(),
+ grpc_port,
+ concurrent_tasks,
+ task_scheduling_policy: TaskSchedulingPolicy::PushStaged,
+ grpc_max_decoding_message_size: max_message_sz,
+ grpc_max_encoding_message_size: max_message_sz,
+ ..ExecutorProcessConfig::default()
+ };
+
+ let shutdown_notifier: &'static ShutdownNotifier =
+ Box::leak(Box::new(ShutdownNotifier::new()));
Review Comment:
Could you use `static` + `Lazy` instead of `Box::leak()` ?
This will leak the notifier on every call to this method.
##########
ballista/executor/src/standalone.rs:
##########
@@ -93,16 +115,72 @@ pub async fn new_standalone_executor_from_builder(
codec: BallistaCodec,
function_registry: BallistaFunctionRegistry,
) -> Result<()> {
- // Let the OS assign a random, free port
+ new_standalone_executor_from_builder_with_scheduling_policy(
+ scheduler,
+ concurrent_tasks,
+ config_producer,
+ runtime_producer,
+ codec,
+ function_registry,
+ TaskSchedulingPolicy::PullStaged,
+ )
+ .await
+}
+
+/// Same as [`new_standalone_executor_from_builder`] with selectable
[`TaskSchedulingPolicy`].
+///
+/// Push mode starts the executor gRPC server required for staged task push
from the scheduler.
+pub async fn new_standalone_executor_from_builder_with_scheduling_policy(
+ scheduler: SchedulerGrpcClient<Channel>,
+ concurrent_tasks: usize,
+ config_producer: ConfigProducer,
+ runtime_producer: RuntimeProducer,
+ codec: BallistaCodec,
+ function_registry: BallistaFunctionRegistry,
+ scheduling_policy: TaskSchedulingPolicy,
+) -> Result<()> {
+ match scheduling_policy {
+ TaskSchedulingPolicy::PullStaged => {
+ pull_staged_standalone_executor(
+ scheduler,
+ concurrent_tasks,
+ config_producer,
+ runtime_producer,
+ codec,
+ function_registry,
+ )
+ .await
+ }
+ TaskSchedulingPolicy::PushStaged => {
+ push_staged_standalone_executor(
+ scheduler,
+ concurrent_tasks,
+ config_producer,
+ runtime_producer,
+ codec,
+ function_registry,
+ )
+ .await
+ }
+ }
+}
+
+async fn pull_staged_standalone_executor(
+ scheduler: SchedulerGrpcClient<Channel>,
+ concurrent_tasks: usize,
+ config_producer: ConfigProducer,
+ runtime_producer: RuntimeProducer,
+ codec: BallistaCodec,
+ function_registry: BallistaFunctionRegistry,
+) -> Result<()> {
let listener = TcpListener::bind("localhost:0").await?;
let address = listener.local_addr()?;
info!("Ballista v{BALLISTA_VERSION} Rust Executor listening on
{address:?}");
let executor_meta = ExecutorRegistration {
- id: Uuid::new_v4().to_string(), // assign this executor a unique ID
+ id: Uuid::new_v4().to_string(),
host: Some("localhost".to_string()),
port: address.port() as u32,
- // TODO Make it configurable
Review Comment:
Why this comment is removed ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]