This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new b72fdd3e Ignore the previous job_id inside fill_reservations() (#141)
b72fdd3e is described below
commit b72fdd3ef53cdb27c1ada39e2742c36df13af817
Author: yahoNanJing <[email protected]>
AuthorDate: Wed Aug 17 22:38:20 2022 +0800
Ignore the previous job_id inside fill_reservations() (#141)
* Normalize the serialization and deserialization places of protobuf structs
* Use a vector to indicate the output links for ExecutionStage
* Ignore the previous job_id inside fill_reservations()
Co-authored-by: yangzhong <[email protected]>
---
ballista/rust/core/proto/ballista.proto | 2 +-
.../rust/core/src/serde/scheduler/from_proto.rs | 81 +++++++++++-
ballista/rust/core/src/serde/scheduler/mod.rs | 146 ---------------------
ballista/rust/core/src/serde/scheduler/to_proto.rs | 79 ++++++++++-
.../rust/scheduler/src/state/execution_graph.rs | 79 ++++++-----
ballista/rust/scheduler/src/state/mod.rs | 2 +-
ballista/rust/scheduler/src/state/task_manager.rs | 128 ++++++------------
7 files changed, 245 insertions(+), 272 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 04ada4bc..7472d7ae 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -427,7 +427,7 @@ message ExecutionGraphStage {
repeated GraphStageInput inputs = 4;
bytes plan = 5;
repeated TaskStatus task_statuses = 6;
- uint32 output_link = 7;
+ repeated uint32 output_links = 7;
bool resolved = 8;
repeated OperatorMetricsSet stage_metrics = 9;
}
diff --git a/ballista/rust/core/src/serde/scheduler/from_proto.rs
b/ballista/rust/core/src/serde/scheduler/from_proto.rs
index 970a5878..42a0c67a 100644
--- a/ballista/rust/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/rust/core/src/serde/scheduler/from_proto.rs
@@ -28,7 +28,10 @@ use crate::error::BallistaError;
use crate::serde::protobuf;
use crate::serde::protobuf::action::ActionType;
use crate::serde::protobuf::{operator_metric, NamedCount, NamedGauge,
NamedTime};
-use crate::serde::scheduler::{Action, PartitionId, PartitionLocation,
PartitionStats};
+use crate::serde::scheduler::{
+ Action, ExecutorData, ExecutorMetadata, ExecutorSpecification,
ExecutorState,
+ PartitionId, PartitionLocation, PartitionStats,
+};
impl TryInto<Action> for protobuf::Action {
type Error = BallistaError;
@@ -202,3 +205,79 @@ impl TryInto<MetricsSet> for protobuf::OperatorMetricsSet {
Ok(ms)
}
}
+
+#[allow(clippy::from_over_into)]
+impl Into<ExecutorMetadata> for protobuf::ExecutorMetadata {
+ fn into(self) -> ExecutorMetadata {
+ ExecutorMetadata {
+ id: self.id,
+ host: self.host,
+ port: self.port as u16,
+ grpc_port: self.grpc_port as u16,
+ specification: self.specification.unwrap().into(),
+ }
+ }
+}
+
+#[allow(clippy::from_over_into)]
+impl Into<ExecutorSpecification> for protobuf::ExecutorSpecification {
+ fn into(self) -> ExecutorSpecification {
+ let mut ret = ExecutorSpecification { task_slots: 0 };
+ for resource in self.resources {
+ if let
Some(protobuf::executor_resource::Resource::TaskSlots(task_slots)) =
+ resource.resource
+ {
+ ret.task_slots = task_slots
+ }
+ }
+ ret
+ }
+}
+
+#[allow(clippy::from_over_into)]
+impl Into<ExecutorData> for protobuf::ExecutorData {
+ fn into(self) -> ExecutorData {
+ let mut ret = ExecutorData {
+ executor_id: self.executor_id,
+ total_task_slots: 0,
+ available_task_slots: 0,
+ };
+ for resource in self.resources {
+ if let Some(task_slots) = resource.total {
+ if let Some(protobuf::executor_resource::Resource::TaskSlots(
+ task_slots,
+ )) = task_slots.resource
+ {
+ ret.total_task_slots = task_slots
+ }
+ };
+ if let Some(task_slots) = resource.available {
+ if let Some(protobuf::executor_resource::Resource::TaskSlots(
+ task_slots,
+ )) = task_slots.resource
+ {
+ ret.available_task_slots = task_slots
+ }
+ };
+ }
+ ret
+ }
+}
+
+#[allow(clippy::from_over_into)]
+impl Into<ExecutorState> for protobuf::ExecutorState {
+ fn into(self) -> ExecutorState {
+ let mut ret = ExecutorState {
+ available_memory_size: u64::MAX,
+ };
+ for metric in self.metrics {
+ if let Some(protobuf::executor_metric::Metric::AvailableMemory(
+ available_memory_size,
+ )) = metric.metric
+ {
+ ret.available_memory_size = available_memory_size
+ }
+ }
+ ret
+ }
+}
diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs
b/ballista/rust/core/src/serde/scheduler/mod.rs
index 369a87d2..38b350d6 100644
--- a/ballista/rust/core/src/serde/scheduler/mod.rs
+++ b/ballista/rust/core/src/serde/scheduler/mod.rs
@@ -26,7 +26,6 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::Partitioning;
use serde::Serialize;
-use super::protobuf;
use crate::error::BallistaError;
pub mod from_proto;
@@ -80,65 +79,12 @@ pub struct ExecutorMetadata {
pub specification: ExecutorSpecification,
}
-#[allow(clippy::from_over_into)]
-impl Into<protobuf::ExecutorMetadata> for ExecutorMetadata {
- fn into(self) -> protobuf::ExecutorMetadata {
- protobuf::ExecutorMetadata {
- id: self.id,
- host: self.host,
- port: self.port as u32,
- grpc_port: self.grpc_port as u32,
- specification: Some(self.specification.into()),
- }
- }
-}
-
-impl From<protobuf::ExecutorMetadata> for ExecutorMetadata {
- fn from(meta: protobuf::ExecutorMetadata) -> Self {
- Self {
- id: meta.id,
- host: meta.host,
- port: meta.port as u16,
- grpc_port: meta.grpc_port as u16,
- specification: meta.specification.unwrap().into(),
- }
- }
-}
-
/// Specification of an executor, indicting executor resources, like total
task slots
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub struct ExecutorSpecification {
pub task_slots: u32,
}
-#[allow(clippy::from_over_into)]
-impl Into<protobuf::ExecutorSpecification> for ExecutorSpecification {
- fn into(self) -> protobuf::ExecutorSpecification {
- protobuf::ExecutorSpecification {
- resources: vec![protobuf::executor_resource::Resource::TaskSlots(
- self.task_slots,
- )]
- .into_iter()
- .map(|r| protobuf::ExecutorResource { resource: Some(r) })
- .collect(),
- }
- }
-}
-
-impl From<protobuf::ExecutorSpecification> for ExecutorSpecification {
- fn from(input: protobuf::ExecutorSpecification) -> Self {
- let mut ret = Self { task_slots: 0 };
- for resource in input.resources {
- if let
Some(protobuf::executor_resource::Resource::TaskSlots(task_slots)) =
- resource.resource
- {
- ret.task_slots = task_slots
- }
- }
- ret
- }
-}
-
/// From Spark, available resources for an executor, like available task slots
#[derive(Debug, Clone, Serialize)]
pub struct ExecutorData {
@@ -152,67 +98,6 @@ pub struct ExecutorDataChange {
pub task_slots: i32,
}
-struct ExecutorResourcePair {
- total: protobuf::executor_resource::Resource,
- available: protobuf::executor_resource::Resource,
-}
-
-#[allow(clippy::from_over_into)]
-impl Into<protobuf::ExecutorData> for ExecutorData {
- fn into(self) -> protobuf::ExecutorData {
- protobuf::ExecutorData {
- executor_id: self.executor_id,
- resources: vec![ExecutorResourcePair {
- total: protobuf::executor_resource::Resource::TaskSlots(
- self.total_task_slots,
- ),
- available: protobuf::executor_resource::Resource::TaskSlots(
- self.available_task_slots,
- ),
- }]
- .into_iter()
- .map(|r| protobuf::ExecutorResourcePair {
- total: Some(protobuf::ExecutorResource {
- resource: Some(r.total),
- }),
- available: Some(protobuf::ExecutorResource {
- resource: Some(r.available),
- }),
- })
- .collect(),
- }
- }
-}
-
-impl From<protobuf::ExecutorData> for ExecutorData {
- fn from(input: protobuf::ExecutorData) -> Self {
- let mut ret = Self {
- executor_id: input.executor_id,
- total_task_slots: 0,
- available_task_slots: 0,
- };
- for resource in input.resources {
- if let Some(task_slots) = resource.total {
- if let Some(protobuf::executor_resource::Resource::TaskSlots(
- task_slots,
- )) = task_slots.resource
- {
- ret.total_task_slots = task_slots
- }
- };
- if let Some(task_slots) = resource.available {
- if let Some(protobuf::executor_resource::Resource::TaskSlots(
- task_slots,
- )) = task_slots.resource
- {
- ret.available_task_slots = task_slots
- }
- };
- }
- ret
- }
-}
-
/// The internal state of an executor, like cpu usage, memory usage, etc
#[derive(Debug, Clone, Copy, Serialize)]
pub struct ExecutorState {
@@ -220,37 +105,6 @@ pub struct ExecutorState {
pub available_memory_size: u64,
}
-#[allow(clippy::from_over_into)]
-impl Into<protobuf::ExecutorState> for ExecutorState {
- fn into(self) -> protobuf::ExecutorState {
- protobuf::ExecutorState {
- metrics: vec![protobuf::executor_metric::Metric::AvailableMemory(
- self.available_memory_size,
- )]
- .into_iter()
- .map(|m| protobuf::ExecutorMetric { metric: Some(m) })
- .collect(),
- }
- }
-}
-
-impl From<protobuf::ExecutorState> for ExecutorState {
- fn from(input: protobuf::ExecutorState) -> Self {
- let mut ret = Self {
- available_memory_size: u64::MAX,
- };
- for metric in input.metrics {
- if let Some(protobuf::executor_metric::Metric::AvailableMemory(
- available_memory_size,
- )) = metric.metric
- {
- ret.available_memory_size = available_memory_size
- }
- }
- ret
- }
-}
-
/// Summary of executed partition
#[derive(Debug, Copy, Clone, Default)]
pub struct PartitionStats {
diff --git a/ballista/rust/core/src/serde/scheduler/to_proto.rs
b/ballista/rust/core/src/serde/scheduler/to_proto.rs
index 815bc96d..7517408b 100644
--- a/ballista/rust/core/src/serde/scheduler/to_proto.rs
+++ b/ballista/rust/core/src/serde/scheduler/to_proto.rs
@@ -21,8 +21,12 @@ use std::convert::TryInto;
use crate::error::BallistaError;
use crate::serde::protobuf;
use crate::serde::protobuf::action::ActionType;
+
use crate::serde::protobuf::{operator_metric, NamedCount, NamedGauge,
NamedTime};
-use crate::serde::scheduler::{Action, PartitionId, PartitionLocation,
PartitionStats};
+use crate::serde::scheduler::{
+ Action, ExecutorData, ExecutorMetadata, ExecutorSpecification,
ExecutorState,
+ PartitionId, PartitionLocation, PartitionStats,
+};
use datafusion::physical_plan::Partitioning;
impl TryInto<protobuf::Action> for Action {
@@ -171,3 +175,76 @@ impl TryInto<protobuf::OperatorMetricsSet> for MetricsSet {
Ok(protobuf::OperatorMetricsSet { metrics })
}
}
+
+#[allow(clippy::from_over_into)]
+impl Into<protobuf::ExecutorMetadata> for ExecutorMetadata {
+ fn into(self) -> protobuf::ExecutorMetadata {
+ protobuf::ExecutorMetadata {
+ id: self.id,
+ host: self.host,
+ port: self.port as u32,
+ grpc_port: self.grpc_port as u32,
+ specification: Some(self.specification.into()),
+ }
+ }
+}
+
+#[allow(clippy::from_over_into)]
+impl Into<protobuf::ExecutorSpecification> for ExecutorSpecification {
+ fn into(self) -> protobuf::ExecutorSpecification {
+ protobuf::ExecutorSpecification {
+ resources: vec![protobuf::executor_resource::Resource::TaskSlots(
+ self.task_slots,
+ )]
+ .into_iter()
+ .map(|r| protobuf::ExecutorResource { resource: Some(r) })
+ .collect(),
+ }
+ }
+}
+
+struct ExecutorResourcePair {
+ total: protobuf::executor_resource::Resource,
+ available: protobuf::executor_resource::Resource,
+}
+
+#[allow(clippy::from_over_into)]
+impl Into<protobuf::ExecutorData> for ExecutorData {
+ fn into(self) -> protobuf::ExecutorData {
+ protobuf::ExecutorData {
+ executor_id: self.executor_id,
+ resources: vec![ExecutorResourcePair {
+ total: protobuf::executor_resource::Resource::TaskSlots(
+ self.total_task_slots,
+ ),
+ available: protobuf::executor_resource::Resource::TaskSlots(
+ self.available_task_slots,
+ ),
+ }]
+ .into_iter()
+ .map(|r| protobuf::ExecutorResourcePair {
+ total: Some(protobuf::ExecutorResource {
+ resource: Some(r.total),
+ }),
+ available: Some(protobuf::ExecutorResource {
+ resource: Some(r.available),
+ }),
+ })
+ .collect(),
+ }
+ }
+}
+
+#[allow(clippy::from_over_into)]
+impl Into<protobuf::ExecutorState> for ExecutorState {
+ fn into(self) -> protobuf::ExecutorState {
+ protobuf::ExecutorState {
+ metrics: vec![protobuf::executor_metric::Metric::AvailableMemory(
+ self.available_memory_size,
+ )]
+ .into_iter()
+ .map(|m| protobuf::ExecutorMetric { metric: Some(m) })
+ .collect(),
+ }
+ }
+}
diff --git a/ballista/rust/scheduler/src/state/execution_graph.rs
b/ballista/rust/scheduler/src/state/execution_graph.rs
index b13f7c58..d928f969 100644
--- a/ballista/rust/scheduler/src/state/execution_graph.rs
+++ b/ballista/rust/scheduler/src/state/execution_graph.rs
@@ -101,8 +101,8 @@ pub struct ExecutionStage {
/// Status of each already scheduled task. If status is None, the
partition has not yet been scheduled
pub(crate) task_statuses: Vec<Option<task_status::Status>>,
/// Stage ID of the stage that will take this stages outputs as inputs.
- /// If `output_link` is `None` then this the final stage in the
`ExecutionGraph`
- pub(crate) output_link: Option<usize>,
+ /// If `output_links` is empty then this the final stage in the
`ExecutionGraph`
+ pub(crate) output_links: Vec<usize>,
/// Flag indicating whether all input partitions have been resolved and
the plan
/// has UnresovledShuffleExec operators resolved to ShuffleReadExec
operators.
pub(crate) resolved: bool,
@@ -136,7 +136,7 @@ impl ExecutionStage {
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
output_partitioning: Option<Partitioning>,
- output_link: Option<usize>,
+ output_links: Vec<usize>,
child_stages: Vec<usize>,
) -> Self {
let num_tasks = plan.output_partitioning().partition_count();
@@ -156,7 +156,7 @@ impl ExecutionStage {
inputs,
plan,
task_statuses: vec![None; num_tasks],
- output_link,
+ output_links,
resolved,
stage_metrics: None,
}
@@ -320,7 +320,7 @@ struct ExecutionStageBuilder {
/// Map from stage ID -> List of child stage IDs
stage_dependencies: HashMap<usize, Vec<usize>>,
/// Map from Stage ID -> output link
- output_links: HashMap<usize, usize>,
+ output_links: HashMap<usize, Vec<usize>>,
}
impl ExecutionStageBuilder {
@@ -346,7 +346,7 @@ impl ExecutionStageBuilder {
for stage in stages {
let partitioning = stage.shuffle_output_partitioning().cloned();
let stage_id = stage.stage_id();
- let output_link = self.output_links.remove(&stage_id);
+ let output_links =
self.output_links.remove(&stage_id).unwrap_or_default();
let child_stages = self
.stage_dependencies
@@ -359,7 +359,7 @@ impl ExecutionStageBuilder {
stage_id,
stage,
partitioning,
- output_link,
+ output_links,
child_stages,
),
);
@@ -381,11 +381,21 @@ impl ExecutionPlanVisitor for ExecutionStageBuilder {
} else if let Some(unresolved_shuffle) =
plan.as_any().downcast_ref::<UnresolvedShuffleExec>()
{
- self.output_links
- .insert(unresolved_shuffle.stage_id, self.current_stage_id);
+ if let Some(output_links) =
+ self.output_links.get_mut(&unresolved_shuffle.stage_id)
+ {
+ if !output_links.contains(&self.current_stage_id) {
+ output_links.push(self.current_stage_id);
+ }
+ } else {
+ self.output_links
+ .insert(unresolved_shuffle.stage_id,
vec![self.current_stage_id]);
+ }
if let Some(deps) =
self.stage_dependencies.get_mut(&self.current_stage_id) {
- deps.push(unresolved_shuffle.stage_id)
+ if !deps.contains(&unresolved_shuffle.stage_id) {
+ deps.push(unresolved_shuffle.stage_id);
+ }
} else {
self.stage_dependencies
.insert(self.current_stage_id,
vec![unresolved_shuffle.stage_id]);
@@ -462,10 +472,10 @@ impl Debug for Task {
///
///
/// The DAG structure of this `ExecutionGraph` is encoded in the stages. Each
stage's `input` field
-/// will indicate which stages it depends on, and each stage's `output_link`
will indicate which
+/// will indicate which stages it depends on, and each stage's `output_links`
will indicate which
/// stage it needs to publish its output to.
///
-/// If a stage has `output_link == None` then it is the final stage in this
query, and it should
+/// If a stage has `output_links` is empty then it is the final stage in this
query, and it should
/// publish its outputs to the `ExecutionGraph`s `output_locations`
representing the final query results.
#[derive(Clone)]
pub struct ExecutionGraph {
@@ -616,28 +626,33 @@ impl ExecutionGraph {
completed_task.partitions,
);
- if let Some(link) = stage.output_link {
- // If this is an intermediate stage, we need to
push its `PartitionLocation`s to the parent stage
- if let Some(linked_stage) =
self.stages.get_mut(&link) {
- linked_stage.add_input_partitions(
- stage_id, partition, locations,
- )?;
-
- // If all tasks for this stage are complete,
mark the input complete in the parent stage
- if stage_complete {
- linked_stage.complete_input(stage_id);
- }
-
- // If all input partitions are ready, we can
resolve any UnresolvedShuffleExec in the parent stage plan
- if linked_stage.resolvable() {
- linked_stage.resolve_shuffles()?;
+ let output_links = stage.output_links.clone();
+ if output_links.is_empty() {
+ // If `output_links` is empty, then this is a
final stage
+ self.output_locations.extend(locations);
+ } else {
+ for link in output_links.into_iter() {
+ // If this is an intermediate stage, we need
to push its `PartitionLocation`s to the parent stage
+ if let Some(linked_stage) =
self.stages.get_mut(&link) {
+ linked_stage.add_input_partitions(
+ stage_id,
+ partition,
+ locations.clone(),
+ )?;
+
+ // If all tasks for this stage are
complete, mark the input complete in the parent stage
+ if stage_complete {
+ linked_stage.complete_input(stage_id);
+ }
+
+ // If all input partitions are ready, we
can resolve any UnresolvedShuffleExec in the parent stage plan
+ if linked_stage.resolvable() {
+ linked_stage.resolve_shuffles()?;
+ }
+ } else {
+ return
Err(BallistaError::Internal(format!("Error updating job {}: Invalid output link
{} for stage {}", job_id, stage_id, link)));
}
- } else {
- return
Err(BallistaError::Internal(format!("Error updating job {}: Invalid output link
{} for stage {}", job_id, stage_id, link)));
}
- } else {
- // If `output_link` is `None`, then this is a
final stage
- self.output_locations.extend(locations);
}
}
} else {
diff --git a/ballista/rust/scheduler/src/state/mod.rs
b/ballista/rust/scheduler/src/state/mod.rs
index 1083665f..12546a8e 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -52,7 +52,7 @@ pub fn decode_protobuf<T: Message + Default>(bytes: &[u8]) ->
Result<T> {
})
}
-pub fn decode_into<T: Message + Default, U: From<T>>(bytes: &[u8]) ->
Result<U> {
+pub fn decode_into<T: Message + Default + Into<U>, U>(bytes: &[u8]) ->
Result<U> {
T::decode(bytes)
.map_err(|e| {
BallistaError::Internal(format!(
diff --git a/ballista/rust/scheduler/src/state/task_manager.rs
b/ballista/rust/scheduler/src/state/task_manager.rs
index cc329263..dcee3722 100644
--- a/ballista/rust/scheduler/src/state/task_manager.rs
+++ b/ballista/rust/scheduler/src/state/task_manager.rs
@@ -233,98 +233,56 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
&self,
reservations: &[ExecutorReservation],
) -> Result<(Vec<(String, Task)>, Vec<ExecutorReservation>, usize)> {
- let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
+ // Reinitialize the free reservations.
+ let free_reservations: Vec<ExecutorReservation> = reservations
+ .iter()
+ .map(|reservation| {
+ ExecutorReservation::new_free(reservation.executor_id.clone())
+ })
+ .collect();
+ let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
with_lock(lock, async {
- let mut assignments: Vec<(String, Task)> = vec![];
- let mut free_reservations: Vec<ExecutorReservation> = vec![];
- // let _txn_ops: Vec<(Keyspace, String, Vec<u8>)> = vec![];
-
- // Need to collect graphs we update so we can update them in
storage when we are done
- let mut graphs: HashMap<String, ExecutionGraph> = HashMap::new();
-
- // First try and fill reservations for particular jobs. If the job
has no more tasks
- // free the reservation.
- for reservation in reservations {
- debug!(
- "Filling reservation for executor {} from job {:?}",
- reservation.executor_id, reservation.job_id
- );
- let executor_id = &reservation.executor_id;
- if let Some(job_id) = &reservation.job_id {
- if let Some(graph) = graphs.get_mut(job_id) {
- if let Ok(Some(next_task)) =
graph.pop_next_task(executor_id) {
- debug!(
- "Filled reservation for executor {} with task
{:?}",
- executor_id, next_task
- );
- assignments.push((executor_id.clone(), next_task));
- } else {
- debug!("Cannot fill reservation for executor {}
from job {}, freeing reservation", executor_id, job_id);
- free_reservations
-
.push(ExecutorReservation::new_free(executor_id.clone()));
- }
- } else {
- // let lock = self.state.lock(Keyspace::ActiveJobs,
job_id).await?;
- let mut graph =
self.get_execution_graph(job_id).await?;
-
- if let Ok(Some(next_task)) =
graph.pop_next_task(executor_id) {
- debug!(
- "Filled reservation for executor {} with task
{:?}",
- executor_id, next_task
- );
- assignments.push((executor_id.clone(), next_task));
- graphs.insert(job_id.clone(), graph);
- // locks.push(lock);
- } else {
- debug!("Cannot fill reservation for executor {}
from job {}, freeing reservation", executor_id, job_id);
- free_reservations
-
.push(ExecutorReservation::new_free(executor_id.clone()));
- }
- }
- } else {
- free_reservations.push(reservation.clone());
- }
- }
-
- let mut other_jobs: Vec<String> =
+ let mut jobs: Vec<String> =
self.get_active_jobs().await?.into_iter().collect();
+ let mut assignments: Vec<(String, Task)> = vec![];
let mut unassigned: Vec<ExecutorReservation> = vec![];
+ // Need to collect graphs we update so we can update them in
storage when we are done
+ let mut graphs: HashMap<String, ExecutionGraph> = HashMap::new();
// Now try and find tasks for free reservations from current set
of graphs
for reservation in free_reservations {
debug!(
- "Filling free reservation for executor {}",
- reservation.executor_id
- );
+ "Filling free reservation for executor {}",
+ reservation.executor_id
+ );
let mut assigned = false;
let executor_id = reservation.executor_id.clone();
// Try and find a task in the graphs we already have locks on
if let Ok(Some(assignment)) = find_next_task(&executor_id,
&mut graphs) {
debug!(
- "Filled free reservation for executor {} with task {:?}",
- reservation.executor_id, assignment.1
- );
+ "Filled free reservation for executor {} with task
{:?}",
+ reservation.executor_id, assignment.1
+ );
// First check if we can find another task
assignments.push(assignment);
assigned = true;
} else {
// Otherwise start searching through other active jobs.
debug!(
- "Filling free reservation for executor {} from active jobs
{:?}",
- reservation.executor_id, other_jobs
- );
- while let Some(job_id) = other_jobs.pop() {
+ "Filling free reservation for executor {} from active
jobs {:?}",
+ reservation.executor_id, jobs
+ );
+ while let Some(job_id) = jobs.pop() {
if graphs.get(&job_id).is_none() {
- // let lock =
self.state.lock(Keyspace::ActiveJobs, &job_id).await?;
let mut graph =
self.get_execution_graph(&job_id).await?;
if let Ok(Some(task)) =
graph.pop_next_task(&executor_id) {
debug!(
- "Filled free reservation for executor {} with
task {:?}",
- reservation.executor_id, task
- );
+ "Filled free reservation for executor {}
with task {:?}",
+ reservation.executor_id, task
+ );
assignments.push((executor_id.clone(), task));
// locks.push(lock);
graphs.insert(job_id, graph);
@@ -339,9 +297,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
if !assigned {
debug!(
- "Unable to fill reservation for executor {}, no tasks
available",
- executor_id
- );
+ "Unable to fill reservation for executor {}, no tasks
available",
+ executor_id
+ );
unassigned.push(reservation);
}
}
@@ -547,15 +505,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
}
}
- // This is a little hacky but since we can't make an optional
- // primitive field in protobuf, we just use 0 to encode None.
- // Should work since stage IDs are 1-indexed.
- let output_link = if stage.output_link == 0 {
- None
- } else {
- Some(stage.output_link as usize)
- };
-
let output_partitioning: Option<Partitioning> =
parse_protobuf_hash_partitioning(
stage.output_partitioning.as_ref(),
@@ -608,7 +557,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
inputs,
plan,
task_statuses,
- output_link,
+ output_links: stage
+ .output_links
+ .into_iter()
+ .map(|l| l as usize)
+ .collect(),
resolved: stage.resolved,
stage_metrics,
};
@@ -642,15 +595,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
.stages
.into_iter()
.map(|(stage_id, stage)| {
- // This is a little hacky but since we can't make an optional
- // primitive field in protobuf, we just use 0 to encode None.
- // Should work since stage IDs are 1-indexed.
- let output_link = if let Some(link) = stage.output_link {
- link as u32
- } else {
- 0
- };
-
let mut plan: Vec<u8> = vec![];
U::try_from_physical_plan(
@@ -715,7 +659,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
inputs,
plan,
task_statuses,
- output_link,
+ output_links: stage
+ .output_links
+ .into_iter()
+ .map(|l| l as u32)
+ .collect(),
resolved: stage.resolved,
stage_metrics,
})