This is an automated email from the ASF dual-hosted git repository.

yangjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 48c4c2d9 [minor] remove useless bracelet (#739)
48c4c2d9 is described below

commit 48c4c2d92b846c629d7ded44b3a26f334f27a673
Author: Yang Jiang <[email protected]>
AuthorDate: Tue Apr 11 18:01:17 2023 +0800

    [minor] remove useless bracelet (#739)
---
 ballista/scheduler/src/state/execution_graph.rs | 280 ++++++++++++------------
 1 file changed, 135 insertions(+), 145 deletions(-)

diff --git a/ballista/scheduler/src/state/execution_graph.rs 
b/ballista/scheduler/src/state/execution_graph.rs
index 4f98e6b9..339649b4 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -311,161 +311,152 @@ impl ExecutionGraph {
                 if let ExecutionStage::Running(running_stage) = stage {
                     let mut locations = vec![];
                     for task_status in stage_task_statuses.into_iter() {
-                        {
-                            let task_stage_attempt_num =
-                                task_status.stage_attempt_num as usize;
-                            if task_stage_attempt_num < 
running_stage.stage_attempt_num {
-                                warn!("Ignore TaskStatus update with TID {} as 
it's from Stage {}.{} and there is a more recent stage attempt {}.{} running",
+                        let task_stage_attempt_num =
+                            task_status.stage_attempt_num as usize;
+                        if task_stage_attempt_num < 
running_stage.stage_attempt_num {
+                            warn!("Ignore TaskStatus update with TID {} as 
it's from Stage {}.{} and there is a more recent stage attempt {}.{} running",
                                     task_status.task_id, stage_id, 
task_stage_attempt_num, stage_id, running_stage.stage_attempt_num);
-                                continue;
-                            }
-                            let partition_id = 
task_status.clone().partition_id as usize;
-                            let task_identity = format!(
-                                "TID {} {}/{}.{}/{}",
-                                task_status.task_id,
-                                job_id,
-                                stage_id,
-                                task_stage_attempt_num,
-                                partition_id
-                            );
-                            let operator_metrics = task_status.metrics.clone();
+                            continue;
+                        }
+                        let partition_id = task_status.clone().partition_id as 
usize;
+                        let task_identity = format!(
+                            "TID {} {}/{}.{}/{}",
+                            task_status.task_id,
+                            job_id,
+                            stage_id,
+                            task_stage_attempt_num,
+                            partition_id
+                        );
+                        let operator_metrics = task_status.metrics.clone();
 
-                            if !running_stage
-                                .update_task_info(partition_id, 
task_status.clone())
-                            {
-                                continue;
-                            }
+                        if !running_stage
+                            .update_task_info(partition_id, 
task_status.clone())
+                        {
+                            continue;
+                        }
 
-                            if let 
Some(task_status::Status::Failed(failed_task)) =
-                                task_status.status
-                            {
-                                let failed_reason = failed_task.failed_reason;
+                        if let Some(task_status::Status::Failed(failed_task)) =
+                            task_status.status
+                        {
+                            let failed_reason = failed_task.failed_reason;
+
+                            match failed_reason {
+                                Some(FailedReason::FetchPartitionError(
+                                    fetch_partiton_error,
+                                )) => {
+                                    let failed_attempts = failed_stage_attempts
+                                        .entry(stage_id)
+                                        .or_insert_with(HashSet::new);
+                                    
failed_attempts.insert(task_stage_attempt_num);
+                                    if failed_attempts.len() < 
max_stage_failures {
+                                        let map_stage_id =
+                                            fetch_partiton_error.map_stage_id 
as usize;
+                                        let map_partition_id = 
fetch_partiton_error
+                                            .map_partition_id
+                                            as usize;
+                                        let executor_id =
+                                            fetch_partiton_error.executor_id;
 
-                                match failed_reason {
-                                    Some(FailedReason::FetchPartitionError(
-                                        fetch_partiton_error,
-                                    )) => {
-                                        let failed_attempts = 
failed_stage_attempts
-                                            .entry(stage_id)
-                                            .or_insert_with(HashSet::new);
-                                        
failed_attempts.insert(task_stage_attempt_num);
-                                        if failed_attempts.len() < 
max_stage_failures {
-                                            let map_stage_id = 
fetch_partiton_error
-                                                .map_stage_id
-                                                as usize;
-                                            let map_partition_id = 
fetch_partiton_error
-                                                .map_partition_id
-                                                as usize;
-                                            let executor_id =
-                                                
fetch_partiton_error.executor_id;
-
-                                            if !failed_stages.is_empty() {
-                                                let error_msg = format!(
-                                                        "Stages was marked 
failed, ignore FetchPartitionError from task {task_identity}");
-                                                warn!("{}", error_msg);
-                                            } else {
-                                                // There are different removal 
strategies here.
-                                                // We can choose just remove 
the map_partition_id in the FetchPartitionError, when resubmit the input stage, 
there are less tasks
-                                                // need to rerun, but this 
might miss many more bad input partitions, lead to more stage level retries in 
following.
-                                                // Here we choose remove all 
the bad input partitions which match the same executor id in this single input 
stage.
-                                                // There are other more 
aggressive approaches, like considering the executor is lost and check all the 
running stages in this graph.
-                                                // Or count the fetch failure 
number on executor and mark the executor lost globally.
-                                                let removed_map_partitions =
-                                                    running_stage
-                                                        
.remove_input_partitions(
-                                                            map_stage_id,
-                                                            map_partition_id,
-                                                            &executor_id,
-                                                        )?;
-
-                                                let failure_reasons =
-                                                    rollback_running_stages
-                                                        .entry(stage_id)
-                                                        
.or_insert_with(HashSet::new);
-                                                
failure_reasons.insert(executor_id);
-
-                                                let missing_inputs =
-                                                    resubmit_successful_stages
-                                                        .entry(map_stage_id)
-                                                        
.or_insert_with(HashSet::new);
-                                                missing_inputs
-                                                    
.extend(removed_map_partitions);
-                                                warn!("Need to resubmit the 
current running Stage {} and its map Stage {} due to FetchPartitionError from 
task {}",
-                                                    stage_id, map_stage_id, 
task_identity)
-                                            }
-                                        } else {
+                                        if !failed_stages.is_empty() {
                                             let error_msg = format!(
-                                                "Stage {} has failed {} times, 
\
-                                            most recent failure reason: {:?}",
-                                                stage_id,
-                                                max_stage_failures,
-                                                failed_task.error
-                                            );
-                                            error!("{}", error_msg);
-                                            failed_stages.insert(stage_id, 
error_msg);
+                                                "Stages was marked failed, 
ignore FetchPartitionError from task {task_identity}");
+                                            warn!("{}", error_msg);
+                                        } else {
+                                            // There are different removal 
strategies here.
+                                            // We can choose just remove the 
map_partition_id in the FetchPartitionError, when resubmit the input stage, 
there are less tasks
+                                            // need to rerun, but this might 
miss many more bad input partitions, lead to more stage level retries in 
following.
+                                            // Here we choose remove all the 
bad input partitions which match the same executor id in this single input 
stage.
+                                            // There are other more aggressive 
approaches, like considering the executor is lost and check all the running 
stages in this graph.
+                                            // Or count the fetch failure 
number on executor and mark the executor lost globally.
+                                            let removed_map_partitions = 
running_stage
+                                                .remove_input_partitions(
+                                                    map_stage_id,
+                                                    map_partition_id,
+                                                    &executor_id,
+                                                )?;
+
+                                            let failure_reasons = 
rollback_running_stages
+                                                .entry(stage_id)
+                                                .or_insert_with(HashSet::new);
+                                            
failure_reasons.insert(executor_id);
+
+                                            let missing_inputs =
+                                                resubmit_successful_stages
+                                                    .entry(map_stage_id)
+                                                    
.or_insert_with(HashSet::new);
+                                            
missing_inputs.extend(removed_map_partitions);
+                                            warn!("Need to resubmit the 
current running Stage {} and its map Stage {} due to FetchPartitionError from 
task {}",
+                                                    stage_id, map_stage_id, 
task_identity)
                                         }
+                                    } else {
+                                        let error_msg = format!(
+                                            "Stage {} has failed {} times, \
+                                            most recent failure reason: {:?}",
+                                            stage_id,
+                                            max_stage_failures,
+                                            failed_task.error
+                                        );
+                                        error!("{}", error_msg);
+                                        failed_stages.insert(stage_id, 
error_msg);
                                     }
-                                    Some(FailedReason::ExecutionError(_)) => {
-                                        failed_stages.insert(stage_id, 
failed_task.error);
-                                    }
-                                    Some(_) => {
-                                        if failed_task.retryable
-                                            && failed_task.count_to_failures
+                                }
+                                Some(FailedReason::ExecutionError(_)) => {
+                                    failed_stages.insert(stage_id, 
failed_task.error);
+                                }
+                                Some(_) => {
+                                    if failed_task.retryable
+                                        && failed_task.count_to_failures
+                                    {
+                                        if 
running_stage.task_failure_number(partition_id)
+                                            < max_task_failures
                                         {
-                                            if running_stage
-                                                
.task_failure_number(partition_id)
-                                                < max_task_failures
-                                            {
-                                                // TODO add new struct to 
track all the failed task infos
-                                                // The failure TaskInfo is 
ignored and set to None here
-                                                running_stage
-                                                    
.reset_task_info(partition_id);
-                                            } else {
-                                                let error_msg = format!(
-                        "Task {} in Stage {} failed {} times, fail the stage, 
most recent failure reason: {:?}",
-                        partition_id, stage_id, max_task_failures, 
failed_task.error
-                    );
-                                                error!("{}", error_msg);
-                                                failed_stages.insert(stage_id, 
error_msg);
-                                            }
-                                        } else if failed_task.retryable {
                                             // TODO add new struct to track 
all the failed task infos
                                             // The failure TaskInfo is ignored 
and set to None here
                                             
running_stage.reset_task_info(partition_id);
+                                        } else {
+                                            let error_msg = format!(
+                                                "Task {} in Stage {} failed {} 
times, fail the stage, most recent failure reason: {:?}",
+                                                partition_id, stage_id, 
max_task_failures, failed_task.error
+                                            );
+                                            error!("{}", error_msg);
+                                            failed_stages.insert(stage_id, 
error_msg);
                                         }
-                                    }
-                                    None => {
-                                        let error_msg = format!(
-                                            "Task {partition_id} in Stage 
{stage_id} failed with unknown failure reasons, fail the stage");
-                                        error!("{}", error_msg);
-                                        failed_stages.insert(stage_id, 
error_msg);
+                                    } else if failed_task.retryable {
+                                        // TODO add new struct to track all 
the failed task infos
+                                        // The failure TaskInfo is ignored and 
set to None here
+                                        
running_stage.reset_task_info(partition_id);
                                     }
                                 }
-                            } else if let Some(task_status::Status::Successful(
-                                successful_task,
-                            )) = task_status.status
-                            {
-                                // update task metrics for successfu task
-                                running_stage.update_task_metrics(
-                                    partition_id,
-                                    operator_metrics,
-                                )?;
-
-                                locations.append(&mut partition_to_location(
-                                    &job_id,
-                                    partition_id,
-                                    stage_id,
-                                    executor,
-                                    successful_task.partitions,
-                                ));
-                            } else {
-                                warn!(
-                                    "The task {}'s status is invalid for 
updating",
-                                    task_identity
-                                );
+                                None => {
+                                    let error_msg = format!(
+                                        "Task {partition_id} in Stage 
{stage_id} failed with unknown failure reasons, fail the stage");
+                                    error!("{}", error_msg);
+                                    failed_stages.insert(stage_id, error_msg);
+                                }
                             }
+                        } else if let Some(task_status::Status::Successful(
+                            successful_task,
+                        )) = task_status.status
+                        {
+                            // update task metrics for successfu task
+                            running_stage
+                                .update_task_metrics(partition_id, 
operator_metrics)?;
+
+                            locations.append(&mut partition_to_location(
+                                &job_id,
+                                partition_id,
+                                stage_id,
+                                executor,
+                                successful_task.partitions,
+                            ));
+                        } else {
+                            warn!(
+                                "The task {}'s status is invalid for updating",
+                                task_identity
+                            );
                         }
                     }
+
                     let is_final_successful = running_stage.is_successful()
                         && !reset_running_stages.contains_key(&stage_id);
                     if is_final_successful {
@@ -899,9 +890,9 @@ impl ExecutionGraph {
                 let task_info = TaskInfo {
                     task_id,
                     scheduled_time: SystemTime::now()
-                    .duration_since(UNIX_EPOCH)
-                    .unwrap()
-                    .as_millis(),
+                        .duration_since(UNIX_EPOCH)
+                        .unwrap()
+                        .as_millis(),
                     // Those times will be updated when the task finish
                     launch_time: 0,
                     start_exec_time: 0,
@@ -1040,15 +1031,14 @@ impl ExecutionGraph {
                             warn!(
                             "Roll back resolved job/stage {}/{} and change 
ShuffleReaderExec back to UnresolvedShuffleExec",
                             job_id, stage_id);
-
-                        },
+                        }
                         ExecutionStage::Running(_) => {
                             rollback_running_stages.insert(*stage_id);
                             warn!(
                             "Roll back running job/stage {}/{} and change 
ShuffleReaderExec back to UnresolvedShuffleExec",
                             job_id, stage_id);
-                        },
-                        _ => {},
+                        }
+                        _ => {}
                     }
                 }
             });

Reply via email to