This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new b72fdd3e Ignore the previous job_id inside fill_reservations() (#141)
b72fdd3e is described below

commit b72fdd3ef53cdb27c1ada39e2742c36df13af817
Author: yahoNanJing <[email protected]>
AuthorDate: Wed Aug 17 22:38:20 2022 +0800

    Ignore the previous job_id inside fill_reservations() (#141)
    
    * Normalize the serialization and deserialization places of protobuf structs
    
    * Use a vector to indicate the output links for ExecutionStage
    
    * Ignore the previous job_id inside fill_reservations()
    
    Co-authored-by: yangzhong <[email protected]>
---
 ballista/rust/core/proto/ballista.proto            |   2 +-
 .../rust/core/src/serde/scheduler/from_proto.rs    |  81 +++++++++++-
 ballista/rust/core/src/serde/scheduler/mod.rs      | 146 ---------------------
 ballista/rust/core/src/serde/scheduler/to_proto.rs |  79 ++++++++++-
 .../rust/scheduler/src/state/execution_graph.rs    |  79 ++++++-----
 ballista/rust/scheduler/src/state/mod.rs           |   2 +-
 ballista/rust/scheduler/src/state/task_manager.rs  | 128 ++++++------------
 7 files changed, 245 insertions(+), 272 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 04ada4bc..7472d7ae 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -427,7 +427,7 @@ message ExecutionGraphStage {
   repeated  GraphStageInput inputs = 4;
   bytes plan = 5;
   repeated TaskStatus task_statuses = 6;
-  uint32 output_link = 7;
+  repeated uint32 output_links = 7;
   bool resolved = 8;
   repeated OperatorMetricsSet stage_metrics = 9;
 }
diff --git a/ballista/rust/core/src/serde/scheduler/from_proto.rs 
b/ballista/rust/core/src/serde/scheduler/from_proto.rs
index 970a5878..42a0c67a 100644
--- a/ballista/rust/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/rust/core/src/serde/scheduler/from_proto.rs
@@ -28,7 +28,10 @@ use crate::error::BallistaError;
 use crate::serde::protobuf;
 use crate::serde::protobuf::action::ActionType;
 use crate::serde::protobuf::{operator_metric, NamedCount, NamedGauge, 
NamedTime};
-use crate::serde::scheduler::{Action, PartitionId, PartitionLocation, 
PartitionStats};
+use crate::serde::scheduler::{
+    Action, ExecutorData, ExecutorMetadata, ExecutorSpecification, 
ExecutorState,
+    PartitionId, PartitionLocation, PartitionStats,
+};
 
 impl TryInto<Action> for protobuf::Action {
     type Error = BallistaError;
@@ -202,3 +205,79 @@ impl TryInto<MetricsSet> for protobuf::OperatorMetricsSet {
         Ok(ms)
     }
 }
+
+#[allow(clippy::from_over_into)]
+impl Into<ExecutorMetadata> for protobuf::ExecutorMetadata {
+    fn into(self) -> ExecutorMetadata {
+        ExecutorMetadata {
+            id: self.id,
+            host: self.host,
+            port: self.port as u16,
+            grpc_port: self.grpc_port as u16,
+            specification: self.specification.unwrap().into(),
+        }
+    }
+}
+
+#[allow(clippy::from_over_into)]
+impl Into<ExecutorSpecification> for protobuf::ExecutorSpecification {
+    fn into(self) -> ExecutorSpecification {
+        let mut ret = ExecutorSpecification { task_slots: 0 };
+        for resource in self.resources {
+            if let 
Some(protobuf::executor_resource::Resource::TaskSlots(task_slots)) =
+                resource.resource
+            {
+                ret.task_slots = task_slots
+            }
+        }
+        ret
+    }
+}
+
+#[allow(clippy::from_over_into)]
+impl Into<ExecutorData> for protobuf::ExecutorData {
+    fn into(self) -> ExecutorData {
+        let mut ret = ExecutorData {
+            executor_id: self.executor_id,
+            total_task_slots: 0,
+            available_task_slots: 0,
+        };
+        for resource in self.resources {
+            if let Some(task_slots) = resource.total {
+                if let Some(protobuf::executor_resource::Resource::TaskSlots(
+                    task_slots,
+                )) = task_slots.resource
+                {
+                    ret.total_task_slots = task_slots
+                }
+            };
+            if let Some(task_slots) = resource.available {
+                if let Some(protobuf::executor_resource::Resource::TaskSlots(
+                    task_slots,
+                )) = task_slots.resource
+                {
+                    ret.available_task_slots = task_slots
+                }
+            };
+        }
+        ret
+    }
+}
+
+#[allow(clippy::from_over_into)]
+impl Into<ExecutorState> for protobuf::ExecutorState {
+    fn into(self) -> ExecutorState {
+        let mut ret = ExecutorState {
+            available_memory_size: u64::MAX,
+        };
+        for metric in self.metrics {
+            if let Some(protobuf::executor_metric::Metric::AvailableMemory(
+                available_memory_size,
+            )) = metric.metric
+            {
+                ret.available_memory_size = available_memory_size
+            }
+        }
+        ret
+    }
+}
diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs 
b/ballista/rust/core/src/serde/scheduler/mod.rs
index 369a87d2..38b350d6 100644
--- a/ballista/rust/core/src/serde/scheduler/mod.rs
+++ b/ballista/rust/core/src/serde/scheduler/mod.rs
@@ -26,7 +26,6 @@ use datafusion::physical_plan::ExecutionPlan;
 use datafusion::physical_plan::Partitioning;
 use serde::Serialize;
 
-use super::protobuf;
 use crate::error::BallistaError;
 
 pub mod from_proto;
@@ -80,65 +79,12 @@ pub struct ExecutorMetadata {
     pub specification: ExecutorSpecification,
 }
 
-#[allow(clippy::from_over_into)]
-impl Into<protobuf::ExecutorMetadata> for ExecutorMetadata {
-    fn into(self) -> protobuf::ExecutorMetadata {
-        protobuf::ExecutorMetadata {
-            id: self.id,
-            host: self.host,
-            port: self.port as u32,
-            grpc_port: self.grpc_port as u32,
-            specification: Some(self.specification.into()),
-        }
-    }
-}
-
-impl From<protobuf::ExecutorMetadata> for ExecutorMetadata {
-    fn from(meta: protobuf::ExecutorMetadata) -> Self {
-        Self {
-            id: meta.id,
-            host: meta.host,
-            port: meta.port as u16,
-            grpc_port: meta.grpc_port as u16,
-            specification: meta.specification.unwrap().into(),
-        }
-    }
-}
-
 /// Specification of an executor, indicting executor resources, like total 
task slots
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
 pub struct ExecutorSpecification {
     pub task_slots: u32,
 }
 
-#[allow(clippy::from_over_into)]
-impl Into<protobuf::ExecutorSpecification> for ExecutorSpecification {
-    fn into(self) -> protobuf::ExecutorSpecification {
-        protobuf::ExecutorSpecification {
-            resources: vec![protobuf::executor_resource::Resource::TaskSlots(
-                self.task_slots,
-            )]
-            .into_iter()
-            .map(|r| protobuf::ExecutorResource { resource: Some(r) })
-            .collect(),
-        }
-    }
-}
-
-impl From<protobuf::ExecutorSpecification> for ExecutorSpecification {
-    fn from(input: protobuf::ExecutorSpecification) -> Self {
-        let mut ret = Self { task_slots: 0 };
-        for resource in input.resources {
-            if let 
Some(protobuf::executor_resource::Resource::TaskSlots(task_slots)) =
-                resource.resource
-            {
-                ret.task_slots = task_slots
-            }
-        }
-        ret
-    }
-}
-
 /// From Spark, available resources for an executor, like available task slots
 #[derive(Debug, Clone, Serialize)]
 pub struct ExecutorData {
@@ -152,67 +98,6 @@ pub struct ExecutorDataChange {
     pub task_slots: i32,
 }
 
-struct ExecutorResourcePair {
-    total: protobuf::executor_resource::Resource,
-    available: protobuf::executor_resource::Resource,
-}
-
-#[allow(clippy::from_over_into)]
-impl Into<protobuf::ExecutorData> for ExecutorData {
-    fn into(self) -> protobuf::ExecutorData {
-        protobuf::ExecutorData {
-            executor_id: self.executor_id,
-            resources: vec![ExecutorResourcePair {
-                total: protobuf::executor_resource::Resource::TaskSlots(
-                    self.total_task_slots,
-                ),
-                available: protobuf::executor_resource::Resource::TaskSlots(
-                    self.available_task_slots,
-                ),
-            }]
-            .into_iter()
-            .map(|r| protobuf::ExecutorResourcePair {
-                total: Some(protobuf::ExecutorResource {
-                    resource: Some(r.total),
-                }),
-                available: Some(protobuf::ExecutorResource {
-                    resource: Some(r.available),
-                }),
-            })
-            .collect(),
-        }
-    }
-}
-
-impl From<protobuf::ExecutorData> for ExecutorData {
-    fn from(input: protobuf::ExecutorData) -> Self {
-        let mut ret = Self {
-            executor_id: input.executor_id,
-            total_task_slots: 0,
-            available_task_slots: 0,
-        };
-        for resource in input.resources {
-            if let Some(task_slots) = resource.total {
-                if let Some(protobuf::executor_resource::Resource::TaskSlots(
-                    task_slots,
-                )) = task_slots.resource
-                {
-                    ret.total_task_slots = task_slots
-                }
-            };
-            if let Some(task_slots) = resource.available {
-                if let Some(protobuf::executor_resource::Resource::TaskSlots(
-                    task_slots,
-                )) = task_slots.resource
-                {
-                    ret.available_task_slots = task_slots
-                }
-            };
-        }
-        ret
-    }
-}
-
 /// The internal state of an executor, like cpu usage, memory usage, etc
 #[derive(Debug, Clone, Copy, Serialize)]
 pub struct ExecutorState {
@@ -220,37 +105,6 @@ pub struct ExecutorState {
     pub available_memory_size: u64,
 }
 
-#[allow(clippy::from_over_into)]
-impl Into<protobuf::ExecutorState> for ExecutorState {
-    fn into(self) -> protobuf::ExecutorState {
-        protobuf::ExecutorState {
-            metrics: vec![protobuf::executor_metric::Metric::AvailableMemory(
-                self.available_memory_size,
-            )]
-            .into_iter()
-            .map(|m| protobuf::ExecutorMetric { metric: Some(m) })
-            .collect(),
-        }
-    }
-}
-
-impl From<protobuf::ExecutorState> for ExecutorState {
-    fn from(input: protobuf::ExecutorState) -> Self {
-        let mut ret = Self {
-            available_memory_size: u64::MAX,
-        };
-        for metric in input.metrics {
-            if let Some(protobuf::executor_metric::Metric::AvailableMemory(
-                available_memory_size,
-            )) = metric.metric
-            {
-                ret.available_memory_size = available_memory_size
-            }
-        }
-        ret
-    }
-}
-
 /// Summary of executed partition
 #[derive(Debug, Copy, Clone, Default)]
 pub struct PartitionStats {
diff --git a/ballista/rust/core/src/serde/scheduler/to_proto.rs 
b/ballista/rust/core/src/serde/scheduler/to_proto.rs
index 815bc96d..7517408b 100644
--- a/ballista/rust/core/src/serde/scheduler/to_proto.rs
+++ b/ballista/rust/core/src/serde/scheduler/to_proto.rs
@@ -21,8 +21,12 @@ use std::convert::TryInto;
 use crate::error::BallistaError;
 use crate::serde::protobuf;
 use crate::serde::protobuf::action::ActionType;
+
 use crate::serde::protobuf::{operator_metric, NamedCount, NamedGauge, 
NamedTime};
-use crate::serde::scheduler::{Action, PartitionId, PartitionLocation, 
PartitionStats};
+use crate::serde::scheduler::{
+    Action, ExecutorData, ExecutorMetadata, ExecutorSpecification, 
ExecutorState,
+    PartitionId, PartitionLocation, PartitionStats,
+};
 use datafusion::physical_plan::Partitioning;
 
 impl TryInto<protobuf::Action> for Action {
@@ -171,3 +175,76 @@ impl TryInto<protobuf::OperatorMetricsSet> for MetricsSet {
         Ok(protobuf::OperatorMetricsSet { metrics })
     }
 }
+
+#[allow(clippy::from_over_into)]
+impl Into<protobuf::ExecutorMetadata> for ExecutorMetadata {
+    fn into(self) -> protobuf::ExecutorMetadata {
+        protobuf::ExecutorMetadata {
+            id: self.id,
+            host: self.host,
+            port: self.port as u32,
+            grpc_port: self.grpc_port as u32,
+            specification: Some(self.specification.into()),
+        }
+    }
+}
+
+#[allow(clippy::from_over_into)]
+impl Into<protobuf::ExecutorSpecification> for ExecutorSpecification {
+    fn into(self) -> protobuf::ExecutorSpecification {
+        protobuf::ExecutorSpecification {
+            resources: vec![protobuf::executor_resource::Resource::TaskSlots(
+                self.task_slots,
+            )]
+            .into_iter()
+            .map(|r| protobuf::ExecutorResource { resource: Some(r) })
+            .collect(),
+        }
+    }
+}
+
+struct ExecutorResourcePair {
+    total: protobuf::executor_resource::Resource,
+    available: protobuf::executor_resource::Resource,
+}
+
+#[allow(clippy::from_over_into)]
+impl Into<protobuf::ExecutorData> for ExecutorData {
+    fn into(self) -> protobuf::ExecutorData {
+        protobuf::ExecutorData {
+            executor_id: self.executor_id,
+            resources: vec![ExecutorResourcePair {
+                total: protobuf::executor_resource::Resource::TaskSlots(
+                    self.total_task_slots,
+                ),
+                available: protobuf::executor_resource::Resource::TaskSlots(
+                    self.available_task_slots,
+                ),
+            }]
+            .into_iter()
+            .map(|r| protobuf::ExecutorResourcePair {
+                total: Some(protobuf::ExecutorResource {
+                    resource: Some(r.total),
+                }),
+                available: Some(protobuf::ExecutorResource {
+                    resource: Some(r.available),
+                }),
+            })
+            .collect(),
+        }
+    }
+}
+
+#[allow(clippy::from_over_into)]
+impl Into<protobuf::ExecutorState> for ExecutorState {
+    fn into(self) -> protobuf::ExecutorState {
+        protobuf::ExecutorState {
+            metrics: vec![protobuf::executor_metric::Metric::AvailableMemory(
+                self.available_memory_size,
+            )]
+            .into_iter()
+            .map(|m| protobuf::ExecutorMetric { metric: Some(m) })
+            .collect(),
+        }
+    }
+}
diff --git a/ballista/rust/scheduler/src/state/execution_graph.rs 
b/ballista/rust/scheduler/src/state/execution_graph.rs
index b13f7c58..d928f969 100644
--- a/ballista/rust/scheduler/src/state/execution_graph.rs
+++ b/ballista/rust/scheduler/src/state/execution_graph.rs
@@ -101,8 +101,8 @@ pub struct ExecutionStage {
     /// Status of each already scheduled task. If status is None, the 
partition has not yet been scheduled
     pub(crate) task_statuses: Vec<Option<task_status::Status>>,
     /// Stage ID of the stage that will take this stages outputs as inputs.
-    /// If `output_link` is `None` then this the final stage in the 
`ExecutionGraph`
-    pub(crate) output_link: Option<usize>,
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    pub(crate) output_links: Vec<usize>,
     /// Flag indicating whether all input partitions have been resolved and 
the plan
     /// has UnresovledShuffleExec operators resolved to ShuffleReadExec 
operators.
     pub(crate) resolved: bool,
@@ -136,7 +136,7 @@ impl ExecutionStage {
         stage_id: usize,
         plan: Arc<dyn ExecutionPlan>,
         output_partitioning: Option<Partitioning>,
-        output_link: Option<usize>,
+        output_links: Vec<usize>,
         child_stages: Vec<usize>,
     ) -> Self {
         let num_tasks = plan.output_partitioning().partition_count();
@@ -156,7 +156,7 @@ impl ExecutionStage {
             inputs,
             plan,
             task_statuses: vec![None; num_tasks],
-            output_link,
+            output_links,
             resolved,
             stage_metrics: None,
         }
@@ -320,7 +320,7 @@ struct ExecutionStageBuilder {
     /// Map from stage ID -> List of child stage IDs
     stage_dependencies: HashMap<usize, Vec<usize>>,
     /// Map from Stage ID -> output link
-    output_links: HashMap<usize, usize>,
+    output_links: HashMap<usize, Vec<usize>>,
 }
 
 impl ExecutionStageBuilder {
@@ -346,7 +346,7 @@ impl ExecutionStageBuilder {
         for stage in stages {
             let partitioning = stage.shuffle_output_partitioning().cloned();
             let stage_id = stage.stage_id();
-            let output_link = self.output_links.remove(&stage_id);
+            let output_links = 
self.output_links.remove(&stage_id).unwrap_or_default();
 
             let child_stages = self
                 .stage_dependencies
@@ -359,7 +359,7 @@ impl ExecutionStageBuilder {
                     stage_id,
                     stage,
                     partitioning,
-                    output_link,
+                    output_links,
                     child_stages,
                 ),
             );
@@ -381,11 +381,21 @@ impl ExecutionPlanVisitor for ExecutionStageBuilder {
         } else if let Some(unresolved_shuffle) =
             plan.as_any().downcast_ref::<UnresolvedShuffleExec>()
         {
-            self.output_links
-                .insert(unresolved_shuffle.stage_id, self.current_stage_id);
+            if let Some(output_links) =
+                self.output_links.get_mut(&unresolved_shuffle.stage_id)
+            {
+                if !output_links.contains(&self.current_stage_id) {
+                    output_links.push(self.current_stage_id);
+                }
+            } else {
+                self.output_links
+                    .insert(unresolved_shuffle.stage_id, 
vec![self.current_stage_id]);
+            }
 
             if let Some(deps) = 
self.stage_dependencies.get_mut(&self.current_stage_id) {
-                deps.push(unresolved_shuffle.stage_id)
+                if !deps.contains(&unresolved_shuffle.stage_id) {
+                    deps.push(unresolved_shuffle.stage_id);
+                }
             } else {
                 self.stage_dependencies
                     .insert(self.current_stage_id, 
vec![unresolved_shuffle.stage_id]);
@@ -462,10 +472,10 @@ impl Debug for Task {
 ///
 ///
 /// The DAG structure of this `ExecutionGraph` is encoded in the stages. Each 
stage's `input` field
-/// will indicate which stages it depends on, and each stage's `output_link` 
will indicate which
+/// will indicate which stages it depends on, and each stage's `output_links` 
will indicate which
 /// stage it needs to publish its output to.
 ///
-/// If a stage has `output_link == None` then it is the final stage in this 
query, and it should
+/// If a stage has `output_links` is empty then it is the final stage in this 
query, and it should
 /// publish its outputs to the `ExecutionGraph`s `output_locations` 
representing the final query results.
 #[derive(Clone)]
 pub struct ExecutionGraph {
@@ -616,28 +626,33 @@ impl ExecutionGraph {
                             completed_task.partitions,
                         );
 
-                        if let Some(link) = stage.output_link {
-                            // If this is an intermediate stage, we need to 
push its `PartitionLocation`s to the parent stage
-                            if let Some(linked_stage) = 
self.stages.get_mut(&link) {
-                                linked_stage.add_input_partitions(
-                                    stage_id, partition, locations,
-                                )?;
-
-                                // If all tasks for this stage are complete, 
mark the input complete in the parent stage
-                                if stage_complete {
-                                    linked_stage.complete_input(stage_id);
-                                }
-
-                                // If all input partitions are ready, we can 
resolve any UnresolvedShuffleExec in the parent stage plan
-                                if linked_stage.resolvable() {
-                                    linked_stage.resolve_shuffles()?;
+                        let output_links = stage.output_links.clone();
+                        if output_links.is_empty() {
+                            // If `output_links` is empty, then this is a 
final stage
+                            self.output_locations.extend(locations);
+                        } else {
+                            for link in output_links.into_iter() {
+                                // If this is an intermediate stage, we need 
to push its `PartitionLocation`s to the parent stage
+                                if let Some(linked_stage) = 
self.stages.get_mut(&link) {
+                                    linked_stage.add_input_partitions(
+                                        stage_id,
+                                        partition,
+                                        locations.clone(),
+                                    )?;
+
+                                    // If all tasks for this stage are 
complete, mark the input complete in the parent stage
+                                    if stage_complete {
+                                        linked_stage.complete_input(stage_id);
+                                    }
+
+                                    // If all input partitions are ready, we 
can resolve any UnresolvedShuffleExec in the parent stage plan
+                                    if linked_stage.resolvable() {
+                                        linked_stage.resolve_shuffles()?;
+                                    }
+                                } else {
+                                    return 
Err(BallistaError::Internal(format!("Error updating job {}: Invalid output link 
{} for stage {}", job_id, stage_id, link)));
                                 }
-                            } else {
-                                return 
Err(BallistaError::Internal(format!("Error updating job {}: Invalid output link 
{} for stage {}", job_id, stage_id, link)));
                             }
-                        } else {
-                            // If `output_link` is `None`, then this is a 
final stage
-                            self.output_locations.extend(locations);
                         }
                     }
                 } else {
diff --git a/ballista/rust/scheduler/src/state/mod.rs 
b/ballista/rust/scheduler/src/state/mod.rs
index 1083665f..12546a8e 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -52,7 +52,7 @@ pub fn decode_protobuf<T: Message + Default>(bytes: &[u8]) -> 
Result<T> {
     })
 }
 
-pub fn decode_into<T: Message + Default, U: From<T>>(bytes: &[u8]) -> 
Result<U> {
+pub fn decode_into<T: Message + Default + Into<U>, U>(bytes: &[u8]) -> 
Result<U> {
     T::decode(bytes)
         .map_err(|e| {
             BallistaError::Internal(format!(
diff --git a/ballista/rust/scheduler/src/state/task_manager.rs 
b/ballista/rust/scheduler/src/state/task_manager.rs
index cc329263..dcee3722 100644
--- a/ballista/rust/scheduler/src/state/task_manager.rs
+++ b/ballista/rust/scheduler/src/state/task_manager.rs
@@ -233,98 +233,56 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         &self,
         reservations: &[ExecutorReservation],
     ) -> Result<(Vec<(String, Task)>, Vec<ExecutorReservation>, usize)> {
-        let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
+        // Reinitialize the free reservations.
+        let free_reservations: Vec<ExecutorReservation> = reservations
+            .iter()
+            .map(|reservation| {
+                ExecutorReservation::new_free(reservation.executor_id.clone())
+            })
+            .collect();
 
+        let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
         with_lock(lock, async {
-            let mut assignments: Vec<(String, Task)> = vec![];
-            let mut free_reservations: Vec<ExecutorReservation> = vec![];
-            // let _txn_ops: Vec<(Keyspace, String, Vec<u8>)> = vec![];
-
-            // Need to collect graphs we update so we can update them in 
storage when we are done
-            let mut graphs: HashMap<String, ExecutionGraph> = HashMap::new();
-
-            // First try and fill reservations for particular jobs. If the job 
has no more tasks
-            // free the reservation.
-            for reservation in reservations {
-                debug!(
-                "Filling reservation for executor {} from job {:?}",
-                reservation.executor_id, reservation.job_id
-            );
-                let executor_id = &reservation.executor_id;
-                if let Some(job_id) = &reservation.job_id {
-                    if let Some(graph) = graphs.get_mut(job_id) {
-                        if let Ok(Some(next_task)) = 
graph.pop_next_task(executor_id) {
-                            debug!(
-                            "Filled reservation for executor {} with task 
{:?}",
-                            executor_id, next_task
-                        );
-                            assignments.push((executor_id.clone(), next_task));
-                        } else {
-                            debug!("Cannot fill reservation for executor {} 
from job {}, freeing reservation", executor_id, job_id);
-                            free_reservations
-                                
.push(ExecutorReservation::new_free(executor_id.clone()));
-                        }
-                    } else {
-                        // let lock = self.state.lock(Keyspace::ActiveJobs, 
job_id).await?;
-                        let mut graph = 
self.get_execution_graph(job_id).await?;
-
-                        if let Ok(Some(next_task)) = 
graph.pop_next_task(executor_id) {
-                            debug!(
-                            "Filled reservation for executor {} with task 
{:?}",
-                            executor_id, next_task
-                        );
-                            assignments.push((executor_id.clone(), next_task));
-                            graphs.insert(job_id.clone(), graph);
-                            // locks.push(lock);
-                        } else {
-                            debug!("Cannot fill reservation for executor {} 
from job {}, freeing reservation", executor_id, job_id);
-                            free_reservations
-                                
.push(ExecutorReservation::new_free(executor_id.clone()));
-                        }
-                    }
-                } else {
-                    free_reservations.push(reservation.clone());
-                }
-            }
-
-            let mut other_jobs: Vec<String> =
+            let mut jobs: Vec<String> =
                 self.get_active_jobs().await?.into_iter().collect();
 
+            let mut assignments: Vec<(String, Task)> = vec![];
             let mut unassigned: Vec<ExecutorReservation> = vec![];
+            // Need to collect graphs we update so we can update them in 
storage when we are done
+            let mut graphs: HashMap<String, ExecutionGraph> = HashMap::new();
             // Now try and find tasks for free reservations from current set 
of graphs
             for reservation in free_reservations {
                 debug!(
-                "Filling free reservation for executor {}",
-                reservation.executor_id
-            );
+                    "Filling free reservation for executor {}",
+                    reservation.executor_id
+                );
                 let mut assigned = false;
                 let executor_id = reservation.executor_id.clone();
 
                 // Try and find a task in the graphs we already have locks on
                 if let Ok(Some(assignment)) = find_next_task(&executor_id, 
&mut graphs) {
                     debug!(
-                    "Filled free reservation for executor {} with task {:?}",
-                    reservation.executor_id, assignment.1
-                );
+                        "Filled free reservation for executor {} with task 
{:?}",
+                        reservation.executor_id, assignment.1
+                    );
                     // First check if we can find another task
                     assignments.push(assignment);
                     assigned = true;
                 } else {
                     // Otherwise start searching through other active jobs.
                     debug!(
-                    "Filling free reservation for executor {} from active jobs 
{:?}",
-                    reservation.executor_id, other_jobs
-                );
-                    while let Some(job_id) = other_jobs.pop() {
+                        "Filling free reservation for executor {} from active 
jobs {:?}",
+                        reservation.executor_id, jobs
+                    );
+                    while let Some(job_id) = jobs.pop() {
                         if graphs.get(&job_id).is_none() {
-                            // let lock = 
self.state.lock(Keyspace::ActiveJobs, &job_id).await?;
                             let mut graph = 
self.get_execution_graph(&job_id).await?;
 
                             if let Ok(Some(task)) = 
graph.pop_next_task(&executor_id) {
                                 debug!(
-                                "Filled free reservation for executor {} with 
task {:?}",
-                                reservation.executor_id, task
-                            );
+                                    "Filled free reservation for executor {} 
with task {:?}",
+                                    reservation.executor_id, task
+                                );
                                 assignments.push((executor_id.clone(), task));
                                 // locks.push(lock);
                                 graphs.insert(job_id, graph);
@@ -339,9 +297,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
 
                 if !assigned {
                     debug!(
-                    "Unable to fill reservation for executor {}, no tasks 
available",
-                    executor_id
-                );
+                        "Unable to fill reservation for executor {}, no tasks 
available",
+                        executor_id
+                    );
                     unassigned.push(reservation);
                 }
             }
@@ -547,15 +505,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                 }
             }
 
-            // This is a little hacky but since we can't make an optional
-            // primitive field in protobuf, we just use 0 to encode None.
-            // Should work since stage IDs are 1-indexed.
-            let output_link = if stage.output_link == 0 {
-                None
-            } else {
-                Some(stage.output_link as usize)
-            };
-
             let output_partitioning: Option<Partitioning> =
                 parse_protobuf_hash_partitioning(
                     stage.output_partitioning.as_ref(),
@@ -608,7 +557,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                 inputs,
                 plan,
                 task_statuses,
-                output_link,
+                output_links: stage
+                    .output_links
+                    .into_iter()
+                    .map(|l| l as usize)
+                    .collect(),
                 resolved: stage.resolved,
                 stage_metrics,
             };
@@ -642,15 +595,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
             .stages
             .into_iter()
             .map(|(stage_id, stage)| {
-                // This is a little hacky but since we can't make an optional
-                // primitive field in protobuf, we just use 0 to encode None.
-                // Should work since stage IDs are 1-indexed.
-                let output_link = if let Some(link) = stage.output_link {
-                    link as u32
-                } else {
-                    0
-                };
-
                 let mut plan: Vec<u8> = vec![];
 
                 U::try_from_physical_plan(
@@ -715,7 +659,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                     inputs,
                     plan,
                     task_statuses,
-                    output_link,
+                    output_links: stage
+                        .output_links
+                        .into_iter()
+                        .map(|l| l as u32)
+                        .collect(),
                     resolved: stage.resolved,
                     stage_metrics,
                 })

Reply via email to