jose-workpath commented on issue #28973: URL: https://github.com/apache/airflow/issues/28973#issuecomment-1495924344
I also have this error on Airflow `2.5.3`. I found out that it also happens when there are 2 dynamically mapped tasks have a dependency with each other and the child task of the last mapped task has another parent task. ie a DAG like this: ```mermaid graph LR A --> B[B mapped-task] --> C[C mapped-task] --> E D --> E ``` In my case, I can not do what @darkfennertrader suggested, because I use the result of the first mapped task as a parameter of the next mapped task. I created a simple DAG to reproduce the error I'm getting: ```python """ Some bug causes a DAG like this to always get skipped when there are more than 1 mapped tasks and the downstream of the last expanded task has another upstream task. ie: `A -> B (mapped) -> C (mapped) -> E -> ...` and: `D -> E` Will immediately cause: `C` and `E` and any downstream of `E` to be Skipped. In this DAG example you will see how the tasks `make_it_double` (C), `wrap_up` (E) and any downstream immediately returns a 'skipped' status as soon as the DAG is triggered and are not scheduled for execution. """ import datetime from typing import Any, Dict, List from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.decorators import task from common.configs.airflow import DEFAULT_ARGS @task def generate_list_of_numbers() -> List[List[int]]: """ Retrieves a list of length 2, each sublist containing lists of integers of range [1, 3] """ return [list(range(3)) for _ in range(2)] @task def add_them_together(numbers: List[int]): """ Add the numbers together """ return sum(numbers) @task def make_it_double(summed: int): """ Multiplies by 2 """ return summed * 2 @task def wrap_up(results) -> int: """ Prints the results """ print(results) @task def some_upstream(): print("Can be any kind of upstream python task.") with DAG( dag_id="reproduce_skipped_expansions_error", schedule_interval=None, catchup=False, dagrun_timeout=datetime.timedelta(minutes=60), doc_md=__doc__, default_args=DEFAULT_ARGS, ) as dag: a_numbers = generate_list_of_numbers.override( task_id="a_generate_list_of_numbers" )() # Expected result: [[1,2,3], [1,2,3]] b_added_numbers = add_them_together.override( task_id="b_generate_list_of_numbers" ).expand(numbers=a_numbers) # Expected result: [6, 6] c_multiplied = make_it_double.override(task_id="c_make_it_double").expand( summed=b_added_numbers ) # Expected result: [12, 12] d_dep = some_upstream.override(task_id="d_some_upstream")() # Just prints 'multiplied': e_wrap = wrap_up.override(task_id="e_wrap_up")(results=c_multiplied) # Define order of tasks: d_dep >> e_wrap e_wrap >> EmptyOperator(task_id="f_done") if __name__ == "__main__": dag.cli() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org