jose-workpath commented on issue #28973:
URL: https://github.com/apache/airflow/issues/28973#issuecomment-1495924344

   I also have this error on Airflow `2.5.3`.
   
   I found out that it also happens when there are 2 dynamically mapped tasks 
have a dependency with each other and the child task of the last mapped task 
has another parent task. ie a DAG like this:
   
   ```mermaid
   graph LR
       A --> B[B mapped-task] --> C[C mapped-task] --> E
       D --> E
   ```
   
   In my case, I can not do what @darkfennertrader suggested, because I use the 
result of the first mapped task as a parameter of the next mapped task.
   
   I created a simple DAG to reproduce the error I'm getting:
   
   ```python
   """
   Some bug causes a DAG like this to always get skipped
   when there are more than 1 mapped tasks and the downstream
   of the last expanded task has another upstream task. ie:
   
   
   `A -> B (mapped) -> C (mapped) -> E -> ...`
   
   and:
   
   `D -> E`
   
   Will immediately cause: `C` and `E` and any downstream of `E` to be Skipped.
   
   In this DAG example you will see how the tasks `make_it_double` (C), 
`wrap_up` (E) and
   any downstream immediately returns a 'skipped' status as soon as the DAG is 
triggered
   and are not scheduled for execution.
   """
   
   import datetime
   from typing import Any, Dict, List
   
   from airflow import DAG
   from airflow.operators.empty import EmptyOperator
   from airflow.decorators import task
   from common.configs.airflow import DEFAULT_ARGS
   
   
   @task
   def generate_list_of_numbers() -> List[List[int]]:
       """
       Retrieves a list of length 2, each sublist containing lists of integers 
of range [1, 3]
       """
       return [list(range(3)) for _ in range(2)]
   
   
   @task
   def add_them_together(numbers: List[int]):
       """
       Add the numbers together
       """
       return sum(numbers)
   
   
   @task
   def make_it_double(summed: int):
       """
       Multiplies by 2
       """
       return summed * 2
   
   
   @task
   def wrap_up(results) -> int:
       """
       Prints the results
       """
       print(results)
   
   
   @task
   def some_upstream():
       print("Can be any kind of upstream python task.")
   
   
   with DAG(
       dag_id="reproduce_skipped_expansions_error",
       schedule_interval=None,
       catchup=False,
       dagrun_timeout=datetime.timedelta(minutes=60),
       doc_md=__doc__,
       default_args=DEFAULT_ARGS,
   ) as dag:
   
       a_numbers = generate_list_of_numbers.override(
           task_id="a_generate_list_of_numbers"
       )()
       # Expected result: [[1,2,3], [1,2,3]]
       b_added_numbers = add_them_together.override(
           task_id="b_generate_list_of_numbers"
       ).expand(numbers=a_numbers)
       # Expected result: [6, 6]
       c_multiplied = 
make_it_double.override(task_id="c_make_it_double").expand(
           summed=b_added_numbers
       )
       # Expected result: [12, 12]
       d_dep = some_upstream.override(task_id="d_some_upstream")()
       # Just prints 'multiplied':
       e_wrap = wrap_up.override(task_id="e_wrap_up")(results=c_multiplied)
       # Define order of tasks:
       d_dep >> e_wrap
       e_wrap >> EmptyOperator(task_id="f_done")
   
   if __name__ == "__main__":
       dag.cli()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to