github-actions[bot] opened a new pull request, #51199: URL: https://github.com/apache/airflow/pull/51199
When using `dag.test()` with deferred tasks, tasks that complete their trigger execution were incorrectly being set to `SUCCESS` state instead of `SCHEDULED` state. This prevented task resumption! Dag used to test: ```python from datetime import datetime, timedelta, timezone from typing import Any import pendulum from airflow.providers.standard.triggers.temporal import DateTimeTrigger from airflow.sdk import Context, task, BaseOperator, DAG class DummyOperator(BaseOperator): def execute(self, context: Context): self.defer( trigger=DateTimeTrigger( moment=datetime.now(timezone.utc) + timedelta(seconds=2), ), method_name="execute_complet", ) def execute_complet(self, context: Context, event: Any = None): assert event is not None return "test" @task def dummy_task(param): print("DEBUG") assert param == "test", "Parameter should be 'test'" with DAG( dag_id="example_debug", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), ) as dag: task1 = DummyOperator(task_id="task1") task2 = dummy_task(task1.output) task1 >> task2 if __name__ == "__main__": dag.test() ``` (cherry picked from commit 1b83f7135d4b5167972ec32752642924e8e0a55a) Co-authored-by: Kaxil Naik <kaxiln...@gmail.com> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org