thinkharderdev commented on a change in pull request #1983:
URL: https://github.com/apache/arrow-datafusion/pull/1983#discussion_r827174662



##########
File path: ballista/rust/scheduler/src/scheduler_server/event_loop.rs
##########
@@ -50,32 +47,53 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
     pub fn new(
         state: Arc<SchedulerState<T, U>>,
         executors_client: ExecutorsClient,
+        is_test: bool,
     ) -> Self {
         Self {
             state,
             executors_client,
+            is_test,
         }
     }
 
-    async fn offer_resources(
-        &self,
-        job_id: String,
-    ) -> Result<Option<SchedulerServerEvent>> {
-        let mut available_executors = 
self.state.get_available_executors_data();
+    async fn offer_resources(&self, n: u32) -> 
Result<Option<SchedulerServerEvent>> {
+        let mut available_executors = if self.is_test {

Review comment:
       Could we use a `cfg` to achieve the same result. So something like:
   
   ```
       #[cfg(not(test))]
       pub(crate) fn get_available_executors_data(&self) -> Vec<ExecutorData> {
           let mut res = {
               let alive_executors = 
self.get_alive_executors_within_one_minute();
               let executors_data = self.executors_data.read();
               executors_data
                   .iter()
                   .filter_map(|(exec, data)| {
                       (data.available_task_slots > 0 && 
alive_executors.contains(exec))
                           .then(|| data.clone())
                   })
                   .collect::<Vec<ExecutorData>>()
           };
           res.sort_by(|a, b| Ord::cmp(&b.available_task_slots, 
&a.available_task_slots));
           res
       }
   
       #[cfg(test)]
       pub(crate) fn get_available_executors_data(&self) -> Vec<ExecutorData> {
           let mut res: Vec<ExecutorData> =
               self.executors_data.read().values().cloned().collect();
           res.sort_by(|a, b| Ord::cmp(&b.available_task_slots, 
&a.available_task_slots));
           res
       }
   ```
   
   and then
   
   ```
           #[cfg(not(test))]
           if num_tasks > 0 {
               self.launch_tasks(&executors_delta_data, tasks_assigment)
                   .await?;
           }
   ```
   
   This still makes the code slightly harder to read but it removes any runtime 
overhead and also localizes the additional flags to two places. 
   

##########
File path: ballista/rust/scheduler/src/scheduler_server/event_loop.rs
##########
@@ -107,14 +125,19 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                             }
                         })
                         .collect::<Vec<String>>(),
-                    executor_data.executor_id
+                    executor_delta_data.executor_id
                 );
                 let mut client = {
                     let clients = self.executors_client.read().await;
-                    clients.get(&executor_data.executor_id).unwrap().clone()
+                    clients
+                        .get(&executor_delta_data.executor_id)
+                        .unwrap()
+                        .clone()
                 };
                 // Update the resources first
-                self.state.save_executor_data(executor_data.clone());
+                self.state
+                    .executor_manager
+                    .update_executor_data(executor_delta_data);
                 // TODO check whether launching task is successful or not
                 client.launch_task(LaunchTaskParams { task: tasks }).await?;

Review comment:
       Should this be in the reverse order? Seems like a potential issue if 
launching the task fails then the executor's task slots will never be freed. 
   
   Doing it in reverse (launch tasks -> update state) could also be a problem, 
so maybe a write lock needs to be acquired before launching tasks?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to