abhinavgautam01 commented on code in PR #1685:
URL: 
https://github.com/apache/datafusion-ballista/pull/1685#discussion_r3223137146


##########
ballista/executor/src/standalone.rs:
##########
@@ -93,16 +115,72 @@ pub async fn new_standalone_executor_from_builder(
     codec: BallistaCodec,
     function_registry: BallistaFunctionRegistry,
 ) -> Result<()> {
-    // Let the OS assign a random, free port
+    new_standalone_executor_from_builder_with_scheduling_policy(
+        scheduler,
+        concurrent_tasks,
+        config_producer,
+        runtime_producer,
+        codec,
+        function_registry,
+        TaskSchedulingPolicy::PullStaged,
+    )
+    .await
+}
+
+/// Same as [`new_standalone_executor_from_builder`] with selectable 
[`TaskSchedulingPolicy`].
+///
+/// Push mode starts the executor gRPC server required for staged task push 
from the scheduler.
+pub async fn new_standalone_executor_from_builder_with_scheduling_policy(
+    scheduler: SchedulerGrpcClient<Channel>,
+    concurrent_tasks: usize,
+    config_producer: ConfigProducer,
+    runtime_producer: RuntimeProducer,
+    codec: BallistaCodec,
+    function_registry: BallistaFunctionRegistry,
+    scheduling_policy: TaskSchedulingPolicy,
+) -> Result<()> {
+    match scheduling_policy {
+        TaskSchedulingPolicy::PullStaged => {
+            pull_staged_standalone_executor(
+                scheduler,
+                concurrent_tasks,
+                config_producer,
+                runtime_producer,
+                codec,
+                function_registry,
+            )
+            .await
+        }
+        TaskSchedulingPolicy::PushStaged => {
+            push_staged_standalone_executor(
+                scheduler,
+                concurrent_tasks,
+                config_producer,
+                runtime_producer,
+                codec,
+                function_registry,
+            )
+            .await
+        }
+    }
+}
+
+async fn pull_staged_standalone_executor(
+    scheduler: SchedulerGrpcClient<Channel>,
+    concurrent_tasks: usize,
+    config_producer: ConfigProducer,
+    runtime_producer: RuntimeProducer,
+    codec: BallistaCodec,
+    function_registry: BallistaFunctionRegistry,
+) -> Result<()> {
     let listener = TcpListener::bind("localhost:0").await?;
     let address = listener.local_addr()?;
     info!("Ballista v{BALLISTA_VERSION} Rust Executor listening on 
{address:?}");
 
     let executor_meta = ExecutorRegistration {
-        id: Uuid::new_v4().to_string(), // assign this executor a unique ID
+        id: Uuid::new_v4().to_string(),
         host: Some("localhost".to_string()),
         port: address.port() as u32,
-        // TODO Make it configurable

Review Comment:
   restored an explicit TODO next to the hard-coded grpc_port on standalone 
pull registration...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to