feluelle commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared URL: https://github.com/apache/airflow/pull/7276#discussion_r378244504
########## File path: tests/operators/test_python.py ########## @@ -541,6 +542,62 @@ def test_xcom_push(self): self.assertEqual( ti.xcom_pull(task_ids='make_choice'), 'branch_1') + def test_clear_skipped_downstream_task(self): + """ + After a downstream task is skipped by BranchPythonOperator, clearing the skipped task + should not cause it to be executed. + """ + branch_op = BranchPythonOperator(task_id='make_choice', + dag=self.dag, + python_callable=lambda: 'branch_1') + branches = [self.branch_1, self.branch_2] + branch_op >> branches + self.dag.clear() + + dr = self.dag.create_dagrun( + run_id="manual__", + start_date=timezone.utcnow(), + execution_date=DEFAULT_DATE, + state=State.RUNNING + ) + + branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + for task in branches: + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + tis = dr.get_task_instances() + for ti in tis: + if ti.task_id == 'make_choice': + self.assertEqual(ti.state, State.SUCCESS) + elif ti.task_id == 'branch_1': + self.assertEqual(ti.state, State.SUCCESS) + elif ti.task_id == 'branch_2': + self.assertEqual(ti.state, State.SKIPPED) + else: + raise Exception Review comment: ```suggestion raise ValueError(f'Invalid task id {ti.task_id} found!') ``` WDYT? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services